You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2019/09/12 17:07:11 UTC
[metron] branch feature/METRON-2088-support-hdp-3.1 updated:
METRON-2188 Upgrade to HBase 2.0.2 (nickwallen) closes apache/metron#1506
This is an automated email from the ASF dual-hosted git repository.
nickallen pushed a commit to branch feature/METRON-2088-support-hdp-3.1
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/feature/METRON-2088-support-hdp-3.1 by this push:
new 802cbdd METRON-2188 Upgrade to HBase 2.0.2 (nickwallen) closes apache/metron#1506
802cbdd is described below
commit 802cbdd71a972f281a445e8025040f863987964f
Author: nickwallen <ni...@nickallen.org>
AuthorDate: Thu Sep 12 13:06:39 2019 -0400
METRON-2188 Upgrade to HBase 2.0.2 (nickwallen) closes apache/metron#1506
---
dependencies_with_url.csv | 33 +
metron-analytics/metron-profiler-storm/pom.xml | 1 +
.../CURRENT/package/scripts/enrichment_commands.py | 2 +-
.../UpdateControllerIntegrationTest.java | 4 +
metron-platform/metron-common/pom.xml | 7 +
metron-platform/metron-data-management/pom.xml | 32 +-
.../mr/LeastRecentlyUsedPrunerIntegrationTest.java | 5 +-
...pleEnrichmentFlatFileLoaderIntegrationTest.java | 3 +-
.../metron-elasticsearch-common/pom.xml | 12 +
.../enrichment/adapters/cif/CIFHbaseAdapter.java | 8 +-
.../enrichment/converter/AbstractConverter.java | 5 +-
.../lookup/accesstracker/AccessTrackerUtil.java | 2 +-
.../simplehbase/SimpleHBaseAdapterTest.java | 2 +-
.../metron-enrichment-storm/pom.xml | 36 +-
metron-platform/metron-hbase-server/pom.xml | 6 +
.../hbase/coprocessor/EnrichmentCoprocessor.java | 17 +-
.../EnrichmentCoprocessorIntegrationTest.java | 4 +-
.../metron-hbase/metron-hbase-common/pom.xml | 17 +
.../java/org/apache/metron/hbase/HBaseUtils.java | 49 +
.../apache/metron/hbase/client/HBaseClient.java | 4 +-
.../metron/hbase/client/HBaseClientTest.java | 5 +-
.../metron/hbase/mock/MockHBaseTableProvider.java | 3 +-
.../org/apache/metron/hbase/mock/MockHTable.java | 1345 +++++++++++---------
.../metron-parsing/metron-parsers/pom.xml | 9 +
metron-stellar/stellar-common/pom.xml | 7 +
pom.xml | 4 +
26 files changed, 942 insertions(+), 680 deletions(-)
diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv
index 70bfb85..623a021 100644
--- a/dependencies_with_url.csv
+++ b/dependencies_with_url.csv
@@ -81,7 +81,9 @@ org.json4s:json4s-ast_2.11:jar:3.2.11:compile,ASLv2,https://github.com/json4s/js
org.json4s:json4s-core_2.11:jar:3.2.11:compile,ASLv2,https://github.com/json4s/json4s
org.json4s:json4s-jackson_2.11:jar:3.2.11:compile,ASLv2,https://github.com/json4s/json4s
org.jruby.jcodings:jcodings:jar:1.0.8:compile,MIT License,https://github.com/jruby/jcodings
+org.jruby.jcodings:jcodings:jar:1.0.18:compile,MIT License,https://github.com/jruby/jcodings
org.jruby.joni:joni:jar:2.1.2:compile,MIT License,https://github.com/jruby/joni
+org.jruby.joni:joni:jar:2.1.11:compile,MIT License,https://github.com/jruby/joni
org.lz4:lz4-java:jar:1.4.0:compile,ASLv2,https://github.com/lz4/lz4-java
org.mitre.taxii:taxii:jar:1.1.0.1:compile,The BSD 3-Clause License,https://github.com/TAXIIProject/java-taxii
org.mitre:stix:jar:1.2.0.2:compile,The BSD 3-Clause License,https://github.com/STIXProject/java-stix
@@ -107,9 +109,11 @@ com.sun.jersey:jersey-server:jar:1.9:compile,CDDL 1.1,https://jersey.java.net/
com.thoughtworks.paranamer:paranamer:jar:2.3:compile,BSD,https://github.com/paul-hammant/paranamer
javax.servlet.jsp:jsp-api:jar:2.1:runtime,CDDL,http://oracle.com
javax.servlet.jsp:jsp-api:jar:2.1:compile,CDDL,http://oracle.com
+javax.servlet.jsp:javax.servlet.jsp-api:jar:2.3.1:compile,CDDL,http://oracle.com
javax.servlet:servlet-api:jar:2.5:compile,CDDL,http://oracle.com
net.jcip:jcip-annotations:jar:1.0:compile,Public,http://jcip.net/
org.codehaus.jettison:jettison:jar:1.1:compile,ASLv2,https://github.com/codehaus/jettison
+org.codehaus.jettison:jettison:jar:1.3.8:compile,ASLv2,https://github.com/codehaus/jettison
org.fusesource.leveldbjni:leveldbjni-all:jar:1.8:compile,BSD 3-clause,https://github.com/fusesource/leveldbjni
org.hamcrest:hamcrest-core:jar:1.1:runtime,BSD 2-clause,http://hamcrest.org/JavaHamcrest/
org.hamcrest:hamcrest-core:jar:1.3:compile,BSD 2-clause,http://hamcrest.org/JavaHamcrest/
@@ -146,6 +150,7 @@ com.101tec:zkclient:jar:0.10:compile,The Apache Software License, Version 2.0,ht
com.github.stephenc.findbugs:findbugs-annotations:jar:1.3.9-1:compile,Apache License, Version 2.0,http://stephenc.github.com/findbugs-annotations
com.github.tony19:named-regexp:jar:0.2.3:compile,Apache License, Version 2.0,
com.google.code.findbugs:jsr305:jar:1.3.9:compile,The Apache Software License, Version 2.0,http://findbugs.sourceforge.net/
+com.google.code.findbugs:jsr305:jar:2.0.1:compile,The Apache Software License, Version 2.0,http://findbugs.sourceforge.net/
com.google.code.findbugs:jsr305:jar:3.0.0:compile,The Apache Software License, Version 2.0,http://findbugs.sourceforge.net/
com.google.code.findbugs:annotations:jar:2.0.1:compile,The Apache Software License, Version 2.0,http://findbugs.sourceforge.net/
com.carrotsearch:hppc:jar:0.7.1:compile,ASLv2,https://github.com/carrotsearch/hppc
@@ -165,6 +170,7 @@ com.fasterxml.jackson.core:jackson-core:jar:2.6.3:compile,ASLv2,https://github.c
com.fasterxml.jackson.core:jackson-core:jar:2.6.6:compile,ASLv2,https://github.com/FasterXML/jackson-core
com.fasterxml.jackson.core:jackson-core:jar:2.7.4:compile,ASLv2,https://github.com/FasterXML/jackson-core
com.fasterxml.jackson.core:jackson-core:jar:2.8.3:compile,ASLv2,https://github.com/FasterXML/jackson-core
+com.fasterxml.jackson.core:jackson-core:jar:2.9.2:compile,ASLv2,https://github.com/FasterXML/jackson-core
com.fasterxml.jackson.core:jackson-core:jar:2.9.4:compile,ASLv2,https://github.com/FasterXML/jackson-core
com.fasterxml.jackson.core:jackson-core:jar:2.9.5:compile,ASLv2,https://github.com/FasterXML/jackson-core
com.fasterxml.jackson.core:jackson-databind:jar:2.2.3:compile,ASLv2,http://wiki.fasterxml.com/JacksonHome
@@ -172,6 +178,7 @@ com.fasterxml.jackson.core:jackson-databind:jar:2.4.3:compile,ASLv2,http://githu
com.fasterxml.jackson.core:jackson-databind:jar:2.6.7.1:compile,ASLv2,http://github.com/FasterXML/jackson
com.fasterxml.jackson.core:jackson-databind:jar:2.7.4:compile,ASLv2,http://github.com/FasterXML/jackson
com.fasterxml.jackson.core:jackson-databind:jar:2.8.3:compile,ASLv2,http://github.com/FasterXML/jackson
+com.fasterxml.jackson.core:jackson-databind:jar:2.9.2:compile,ASLv2,http://github.com/FasterXML/jackson
com.fasterxml.jackson.core:jackson-databind:jar:2.9.4:compile,ASLv2,http://github.com/FasterXML/jackson
com.fasterxml.jackson.core:jackson-databind:jar:2.9.5:compile,ASLv2,http://github.com/FasterXML/jackson
com.fasterxml.jackson.core:jackson-databind:jar:2.9.6:compile,ASLv2,http://github.com/FasterXML/jackson
@@ -209,6 +216,7 @@ com.google.inject:guice:jar:3.0:compile,ASLv2,
com.googlecode.disruptor:disruptor:jar:2.10.4:compile,The Apache Software License, Version 2.0,http://code.google.com/p/disruptor/
com.lmax:disruptor:jar:3.3.0:compile,The Apache Software License, Version 2.0,https://github.com/LMAX-Exchange/disruptor/
com.lmax:disruptor:jar:3.3.2:compile,The Apache Software License, Version 2.0,https://github.com/LMAX-Exchange/disruptor/
+com.lmax:disruptor:jar:3.3.6:compile,The Apache Software License, Version 2.0,https://github.com/LMAX-Exchange/disruptor/
com.googlecode.json-simple:json-simple:jar:1.1:compile,The Apache Software License, Version 2.0,http://code.google.com/p/json-simple/
com.googlecode.json-simple:json-simple:jar:1.1.1:compile,The Apache Software License, Version 2.0,http://code.google.com/p/json-simple/
com.jamesmurty.utils:java-xmlbuilder:jar:0.4:compile,Apache License, Version 2.0,http://code.google.com/p/java-xmlbuilder/
@@ -272,6 +280,9 @@ io.confluent:kafka-avro-serializer:jar:1.0:compile,ASLv2,https://github.com/conf
io.confluent:kafka-schema-registry-client:jar:1.0:compile,ASLv2,https://github.com/confluentinc/schema-registry/
io.dropwizard.metrics:metrics-core:jar:3.1.0:compile,ASLv2,https://github.com/dropwizard/metrics
io.dropwizard.metrics:metrics-core:jar:3.1.5:compile,ASLv2,https://github.com/dropwizard/metrics
+io.dropwizard.metrics:metrics-core:jar:3.2.1:compile,ASLv2,https://github.com/dropwizard/metrics
+io.dropwizard.metrics:metrics-core:jar:3.2.2:compile,ASLv2,https://github.com/dropwizard/metrics
+io.dropwizard.metrics:metrics-core:jar:3.2.6:compile,ASLv2,https://github.com/dropwizard/metrics
io.dropwizard.metrics:metrics-ganglia:jar:3.1.0:compile,ASLv2,https://github.com/dropwizard/metrics
io.dropwizard.metrics:metrics-graphite:jar:3.1.0:compile,ASLv2,https://github.com/dropwizard/metrics
io.dropwizard.metrics:metrics-graphite:jar:3.1.5:compile,ASLv2,https://github.com/dropwizard/metrics
@@ -305,11 +316,20 @@ org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile,The Apache Software L
org.codehaus.woodstox:stax2-api:jar:3.1.4:compile,The BSD License,http://wiki.fasterxml.com/WoodstoxStax2
org.codehaus.woodstox:woodstox-core-asl:jar:4.4.1:compile,ASLv2,http://woodstox.codehaus.org
org.eclipse.jetty:jetty-http:jar:9.3.0.M0:compile,ASLv2,http://www.eclipse.org/jetty
+org.eclipse.jetty:jetty-http:jar:9.3.19.v20170502:compile,ASLv2,http://www.eclipse.org/jetty
org.eclipse.jetty:jetty-io:jar:9.3.0.M0:compile,ASLv2,http://www.eclipse.org/jetty
+org.eclipse.jetty:jetty-io:jar:9.3.19.v20170502:compile,ASLv2,http://www.eclipse.org/jetty
org.eclipse.jetty:jetty-security:jar:9.3.0.M0:compile,ASLv2,http://www.eclipse.org/jetty
+org.eclipse.jetty:jetty-security:jar:9.3.19.v20170502:compile,ASLv2,http://www.eclipse.org/jetty
org.eclipse.jetty:jetty-server:jar:9.3.0.M0:compile,ASLv2,http://www.eclipse.org/jetty
+org.eclipse.jetty:jetty-server:jar:9.3.19.v20170502:compile,ASLv2,http://www.eclipse.org/jetty
org.eclipse.jetty:jetty-servlet:jar:9.3.0.M0:compile,ASLv2,http://www.eclipse.org/jetty
+org.eclipse.jetty:jetty-servlet:jar:9.3.19.v20170502:compile,ASLv2,http://www.eclipse.org/jetty
org.eclipse.jetty:jetty-util:jar:9.3.0.M0:compile,ASLv2,http://www.eclipse.org/jetty
+org.eclipse.jetty:jetty-util:jar:9.3.19.v20170502:compile,ASLv2,http://www.eclipse.org/jetty
+org.eclipse.jetty:jetty-util-ajax:jar:9.3.19.v20170502:compile,ASLv2,http://www.eclipse.org/jetty
+org.eclipse.jetty:jetty-webapp:jar:9.3.19.v20170502:compile,ASLv2,http://www.eclipse.org/jetty
+org.eclipse.jetty:jetty-xml:jar:9.3.19.v20170502:compile,ASLv2,http://www.eclipse.org/jetty
org.eclipse.jetty.aggregate:jetty-all:jar:7.6.0.v20120127:compile,ASLv2,http://www.eclipse.org/jetty
org.elasticsearch:elasticsearch:jar:2.3.3:compile,ASLv2,
org.elasticsearch:elasticsearch:jar:2.3.3:import,ASLv2,
@@ -543,3 +563,16 @@ org.datanucleus:datanucleus-rdbms:jar:3.2.9:compile,ASLv2,http://www.datanucleus
org.lz4:lz4-java:jar:1.4.1:compile,ASLv2,https://github.com/lz4/lz4-java
com.jolbox:bonecp:jar:0.8.0.RELEASE:compile,ASLv2
stax:stax-api:jar:1.0.1:compile,ASLv2
+org.glassfish.hk2.external:aopalliance-repackaged:jar:2.5.0-b32:compile,Common Development and Distribution License (CDDL) v1.0,https://javaee.github.io/hk2/
+org.glassfish.hk2.external:javax.inject:jar:2.5.0-b32:compile,Common Development and Distribution License (CDDL) v1.0,https://javaee.github.io/hk2/
+org.glassfish.hk2:hk2-api:jar:2.5.0-b32:compile,Common Development and Distribution License (CDDL) v1.0,https://javaee.github.io/hk2/
+org.glassfish.hk2:hk2-locator:jar:2.5.0-b32:compile,Common Development and Distribution License (CDDL) v1.0,https://javaee.github.io/hk2/
+org.glassfish.hk2:hk2-utils:jar:2.5.0-b32:compile,Common Development and Distribution License (CDDL) v1.0,https://javaee.github.io/hk2/
+org.glassfish.jersey.bundles.repackaged:jersey-guava:jar:2.25.1:compile,Eclipse Public License 2.0,https://jersey.github.io/
+org.glassfish.jersey.containers:jersey-container-servlet-core:jar:2.25.1:compile,Eclipse Public License 2.0,https://jersey.github.io/
+org.glassfish.jersey.core:jersey-client:jar:2.25.1:compile,Eclipse Public License 2.0,https://jersey.github.io/
+org.glassfish.jersey.core:jersey-common:jar:2.25.1:compile,Eclipse Public License 2.0,https://jersey.github.io/
+org.glassfish.jersey.core:jersey-server:jar:2.25.1:compile,Eclipse Public License 2.0,https://jersey.github.io/
+org.glassfish.jersey.media:jersey-media-jaxb:jar:2.25.1:compile,Eclipse Public License 2.0,https://jersey.github.io/
+org.glassfish.web:javax.servlet.jsp:jar:2.3.2:compile,Eclipse Public License 2.0,https://jersey.github.io/
+org.glassfish:javax.el:jar:3.0.1-b11:compile,Eclipse Public License 2.0,https://jersey.github.io/
diff --git a/metron-analytics/metron-profiler-storm/pom.xml b/metron-analytics/metron-profiler-storm/pom.xml
index 0984c24..5a2bd34 100644
--- a/metron-analytics/metron-profiler-storm/pom.xml
+++ b/metron-analytics/metron-profiler-storm/pom.xml
@@ -28,6 +28,7 @@
<guava_version>${global_hbase_guava_version}</guava_version>
</properties>
<dependencies>
+ <!-- Dependencies needed preferentially -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
index 487f166..68769bb 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
@@ -226,7 +226,7 @@ class EnrichmentCommands:
recursive_chown = True)
Logger.info("Loading HBase coprocessor for enrichments")
- Logger.info("See https://hbase.apache.org/1.1/book.html#load_coprocessor_in_shell for more detail")
+ Logger.info("See https://hbase.apache.org/2.0/book.html#load_coprocessor_in_shell for more detail")
Logger.info("HBase coprocessor setup - first disabling the enrichments HBase table.")
command_template = "echo \"disable '{0}'\" | hbase shell -n"
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java
index 02f3f3a..7204049 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java
@@ -134,6 +134,10 @@ public class UpdateControllerIntegrationTest extends DaoControllerTest {
metaAlertIndex, MetaAlertControllerIntegrationTest.metaAlertData
);
loadTestData(testData);
+ MockHTable table = (MockHTable) MockHBaseTableProvider.getFromCache(TABLE);
+ if(table != null) {
+ table.clear();
+ }
}
@Test
diff --git a/metron-platform/metron-common/pom.xml b/metron-platform/metron-common/pom.xml
index fa2de43..710abc7 100644
--- a/metron-platform/metron-common/pom.xml
+++ b/metron-platform/metron-common/pom.xml
@@ -393,6 +393,13 @@
<shadedPattern>org.apache.metron.guava.${guava_version}</shadedPattern>
</relocation>
<relocation>
+ <!-- need to also relocate Guava's `thirdparty` packages. otherwise, the class
+ `thirdparty.publicsuffix.PublicSuffixPatterns` cannot be found after
+ relocation of Guava's `com.google.commmon` packages. -->
+ <pattern>com.google.thirdparty</pattern>
+ <shadedPattern>org.apache.metron.guava.thirdparty.${guava_version}</shadedPattern>
+ </relocation>
+ <relocation>
<pattern>org.apache.commons.beanutils</pattern>
<shadedPattern>org.apache.metron.beanutils</shadedPattern>
</relocation>
diff --git a/metron-platform/metron-data-management/pom.xml b/metron-platform/metron-data-management/pom.xml
index d4fae01..6ea2fb0 100644
--- a/metron-platform/metron-data-management/pom.xml
+++ b/metron-platform/metron-data-management/pom.xml
@@ -118,6 +118,13 @@
</dependency>
<dependency>
<groupId>org.apache.metron</groupId>
+ <artifactId>metron-enrichment-common</artifactId>
+ <version>${project.parent.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
<artifactId>metron-hbase-common</artifactId>
<version>${project.parent.version}</version>
</dependency>
@@ -185,7 +192,7 @@
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
- <artifactId>hbase-common</artifactId>
+ <artifactId>hbase-mapreduce</artifactId>
<version>${global_hbase_version}</version>
<exclusions>
<exclusion>
@@ -195,29 +202,6 @@
</exclusions>
</dependency>
<dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-server</artifactId>
- <version>${global_hbase_version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-auth</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>${global_opencsv_version}</version>
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java
index 947a5f4..6852f5b 100644
--- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java
@@ -28,6 +28,7 @@ import org.apache.commons.cli.PosixParser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
@@ -67,8 +68,8 @@ public class LeastRecentlyUsedPrunerIntegrationTest {
Map.Entry<HBaseTestingUtility, Configuration> kv = HBaseUtil.INSTANCE.create(true);
config = kv.getValue();
testUtil = kv.getKey();
- testTable = testUtil.createTable(Bytes.toBytes(tableName), Bytes.toBytes(cf));
- atTable = testUtil.createTable(Bytes.toBytes(atTableName), Bytes.toBytes(atCF));
+ testTable = testUtil.createTable(TableName.valueOf(tableName), cf);
+ atTable = testUtil.createTable(TableName.valueOf(atTableName), atCF);
}
@AfterClass
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderIntegrationTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderIntegrationTest.java
index 274989a..b68f6bd 100644
--- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderIntegrationTest.java
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderIntegrationTest.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -191,7 +192,7 @@ public class SimpleEnrichmentFlatFileLoaderIntegrationTest {
Map.Entry<HBaseTestingUtility, Configuration> kv = HBaseUtil.INSTANCE.create(true);
config = kv.getValue();
testUtil = kv.getKey();
- testTable = testUtil.createTable(Bytes.toBytes(tableName), Bytes.toBytes(cf));
+ testTable = testUtil.createTable(TableName.valueOf(tableName), cf);
zookeeperUrl = getZookeeperUrl(config.get("hbase.zookeeper.quorum"), testUtil.getZkCluster().getClientPort());
setupGlobalConfig(zookeeperUrl);
diff --git a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/pom.xml b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/pom.xml
index 8edf5ab..052586a 100644
--- a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/pom.xml
+++ b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/pom.xml
@@ -68,6 +68,18 @@
</exclusions>
</dependency>
<dependency>
+ <!-- elasticsearch requires a version > ${global_httpclient_version} -->
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.4.1</version>
+ </dependency>
+ <dependency>
+ <!-- elasticsearch requires a version > ${global_httpclient_version} -->
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ <version>4.4.9</version>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${global_guava_version}</version>
diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
index ec325e2..87c2f64 100644
--- a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
+++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
@@ -22,9 +22,11 @@ import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
@@ -35,6 +37,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.metron.enrichment.cache.CacheKey;
import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.apache.metron.hbase.HBaseUtils;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,8 +85,9 @@ public class CIFHbaseAdapter implements EnrichmentAdapter<CacheKey>,Serializable
try {
rs = table.get(get);
- for (KeyValue kv : rs.raw())
- output.put(new String(kv.getQualifier(), StandardCharsets.UTF_8), "Y");
+ for (Cell cell : rs.rawCells()) {
+ output.put(new String(HBaseUtils.getQualifier(cell), StandardCharsets.UTF_8), "Y");
+ }
} catch (IOException e) {
// TODO Auto-generated catch block
diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/converter/AbstractConverter.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/converter/AbstractConverter.java
index 4b57677..a0fcc96 100644
--- a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/converter/AbstractConverter.java
+++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/converter/AbstractConverter.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.metron.enrichment.lookup.LookupKV;
import org.apache.metron.enrichment.lookup.LookupKey;
import org.apache.metron.enrichment.lookup.LookupValue;
+import org.apache.metron.hbase.HBaseUtils;
import javax.annotation.Nullable;
import java.io.IOException;
@@ -40,7 +41,7 @@ public abstract class AbstractConverter<KEY_T extends LookupKey, VALUE_T extends
@Nullable
@Override
public Map.Entry<byte[], byte[]> apply(@Nullable Cell cell) {
- return new AbstractMap.SimpleEntry<>(cell.getQualifier(), cell.getValue());
+ return new AbstractMap.SimpleEntry<>(HBaseUtils.getQualifier(cell), HBaseUtils.getValue(cell));
}
};
@Override
@@ -48,7 +49,7 @@ public abstract class AbstractConverter<KEY_T extends LookupKey, VALUE_T extends
Put put = new Put(key.toBytes());
byte[] cf = Bytes.toBytes(columnFamily);
for(Map.Entry<byte[], byte[]> kv : values.toColumns()) {
- put.add(cf, kv.getKey(), kv.getValue());
+ put.addColumn(cf, kv.getKey(), kv.getValue());
}
return put;
}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackerUtil.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackerUtil.java
index add1f0f..7339fd9 100644
--- a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackerUtil.java
+++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackerUtil.java
@@ -46,7 +46,7 @@ public enum AccessTrackerUtil {
public void persistTracker(Table accessTrackerTable, String columnFamily, PersistentAccessTracker.AccessTrackerKey key, AccessTracker underlyingTracker) throws IOException {
Put put = new Put(key.toRowKey());
- put.add(Bytes.toBytes(columnFamily), COLUMN, serializeTracker(underlyingTracker));
+ put.addColumn(Bytes.toBytes(columnFamily), COLUMN, serializeTracker(underlyingTracker));
accessTrackerTable.put(put);
}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java
index ef04370..628aa49 100644
--- a/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java
+++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java
@@ -104,7 +104,7 @@ public class SimpleHBaseAdapterTest {
@Before
public void setup() throws Exception {
final MockHTable trackerTable = (MockHTable) MockHBaseTableProvider.addToCache(atTableName, cf);
- final MockHTable hbaseTable = (MockHTable) MockHBaseTableProvider.addToCache(hbaseTableName, cf);
+ final MockHTable hbaseTable = (MockHTable) MockHBaseTableProvider.addToCache(hbaseTableName, cf, cf1);
EnrichmentHelper.INSTANCE.load(hbaseTable, cf, new ArrayList<LookupKV<EnrichmentKey, EnrichmentValue>>() {{
add(new LookupKV<>(new EnrichmentKey(PLAYFUL_CLASSIFICATION_TYPE, "10.0.2.3")
, new EnrichmentValue(PLAYFUL_ENRICHMENT)
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/pom.xml b/metron-platform/metron-enrichment/metron-enrichment-storm/pom.xml
index 6aee1e6..4bbf653 100644
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/pom.xml
+++ b/metron-platform/metron-enrichment/metron-enrichment-storm/pom.xml
@@ -26,26 +26,42 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <slf4j.version>1.7.7</slf4j.version>
<storm.hdfs.version>0.1.2</storm.hdfs.version>
<commons-compress.version>1.13</commons-compress.version>
<geoip.version>2.8.0</geoip.version>
<guava.version>${global_hbase_guava_version}</guava.version>
</properties>
<dependencies>
+
<!-- Dependencies needed preferentially -->
+
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.2</version>
</dependency>
-
- <!-- Metron -->
+ <dependency>
+ <!-- prevents hbase and kafka from pulling in newer versions of jackson
+ (2.9.x) which conflict with metron's ${global_jackson_version}-->
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${global_jackson_version}</version>
+ </dependency>
+ <dependency>
+ <!-- prevents hbase and kafka from pulling in newer versions of jackson
+ (2.9.x) which conflict with metron's ${global_jackson_version}-->
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <version>${global_jackson_version}</version>
+ </dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${global_guava_version}</version>
</dependency>
+
+ <!-- Metron -->
+
<dependency>
<groupId>org.apache.metron</groupId>
<artifactId>metron-enrichment-common</artifactId>
@@ -219,14 +235,12 @@
<version>${global_caffeine_version}</version>
</dependency>
<dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <version>${global_jackson_version}</version>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-annotations</artifactId>
- <version>${global_jackson_version}</version>
+ <!-- prevents storm and hadoop from pulling in newer versions of slf4j-api
+ (1.7.x) which conflict with kafka and metron's ${global_jackson_version}-->
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${global_slf4j_version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-validator</groupId>
diff --git a/metron-platform/metron-hbase-server/pom.xml b/metron-platform/metron-hbase-server/pom.xml
index e920bad..0d33af4 100644
--- a/metron-platform/metron-hbase-server/pom.xml
+++ b/metron-platform/metron-hbase-server/pom.xml
@@ -51,6 +51,10 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
+ <exclusion>
+ <artifactId>hadoop-common</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -278,6 +282,8 @@
<exclude>org/slf4j/**</exclude>
<exclude>META-INF/maven/org.slf4j/**</exclude>
<exclude>META-INF/license/LICENSE.slf4j.txt</exclude>
+ <!-- hadoop deps from metron-common are not needed here and cause classpath conflicts with hbase -->
+ <exclude>org/apache/hadoop/**</exclude>
</excludes>
</filter>
</filters>
diff --git a/metron-platform/metron-hbase-server/src/main/java/org/apache/metron/hbase/coprocessor/EnrichmentCoprocessor.java b/metron-platform/metron-hbase-server/src/main/java/org/apache/metron/hbase/coprocessor/EnrichmentCoprocessor.java
index ae43392..e85e6da 100644
--- a/metron-platform/metron-hbase-server/src/main/java/org/apache/metron/hbase/coprocessor/EnrichmentCoprocessor.java
+++ b/metron-platform/metron-hbase-server/src/main/java/org/apache/metron/hbase/coprocessor/EnrichmentCoprocessor.java
@@ -25,16 +25,19 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
+import java.util.Optional;
+
import org.apache.curator.framework.CuratorFramework;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.metron.common.configuration.ConfigurationsUtils;
import org.apache.metron.common.configuration.EnrichmentConfigurations;
import org.apache.metron.enrichment.converter.EnrichmentKey;
@@ -64,13 +67,14 @@ import org.slf4j.LoggerFactory;
* </ul>
*
* @see <a href="https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionObserver.html">https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionObserver.html</a>
+ * @see <a href="https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionCoprocessor.html">https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionCoprocessor.html</a>
* @see EnrichmentConfigurations Available options.
*/
-public class EnrichmentCoprocessor extends BaseRegionObserver {
+public class EnrichmentCoprocessor implements RegionObserver, RegionCoprocessor {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
// pass in via coprocessor config options - via hbase shell or hbase-site.xml
- // see more here - https://hbase.apache.org/1.1/book.html#load_coprocessor_in_shell
+ // see more here - https://hbase.apache.org/2.0/book.html#load_coprocessor_in_shell
public static final String ZOOKEEPER_URL = "zookeeperUrl";
public static final String COLUMN_QUALIFIER = "v";
private Cache<String, String> cache;
@@ -101,6 +105,11 @@ public class EnrichmentCoprocessor extends BaseRegionObserver {
}
@Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
public void start(CoprocessorEnvironment ce) throws IOException {
LOG.info("Starting enrichment coprocessor");
if (ce instanceof RegionCoprocessorEnvironment) {
diff --git a/metron-platform/metron-hbase-server/src/test/java/org/apache/metron/hbase/coprocessor/EnrichmentCoprocessorIntegrationTest.java b/metron-platform/metron-hbase-server/src/test/java/org/apache/metron/hbase/coprocessor/EnrichmentCoprocessorIntegrationTest.java
index 2bc756e..08e8abd 100644
--- a/metron-platform/metron-hbase-server/src/test/java/org/apache/metron/hbase/coprocessor/EnrichmentCoprocessorIntegrationTest.java
+++ b/metron-platform/metron-hbase-server/src/test/java/org/apache/metron/hbase/coprocessor/EnrichmentCoprocessorIntegrationTest.java
@@ -141,10 +141,10 @@ public class EnrichmentCoprocessorIntegrationTest extends BaseIntegrationTest {
Map.Entry<HBaseTestingUtility, Configuration> kv = HBaseUtil.INSTANCE.create(true, extraConfig);
testUtil = kv.getKey();
hBaseConfig = kv.getValue();
- enrichmentTable = testUtil.createTable(Bytes.toBytes(ENRICHMENT_TABLE), Bytes.toBytes(
+ enrichmentTable = testUtil.createTable(TableName.valueOf(ENRICHMENT_TABLE), Bytes.toBytes(
COLUMN_FAMILY));
enrichmentListTable = testUtil
- .createTable(Bytes.toBytes(ENRICHMENT_LIST_TABLE), Bytes.toBytes(COLUMN_FAMILY));
+ .createTable(TableName.valueOf(ENRICHMENT_LIST_TABLE), Bytes.toBytes(COLUMN_FAMILY));
for (Result r : enrichmentTable.getScanner(Bytes.toBytes(COLUMN_FAMILY))) {
Delete d = new Delete(r.getRow());
diff --git a/metron-platform/metron-hbase/metron-hbase-common/pom.xml b/metron-platform/metron-hbase/metron-hbase-common/pom.xml
index 84b24d3..5b484c2 100644
--- a/metron-platform/metron-hbase/metron-hbase-common/pom.xml
+++ b/metron-platform/metron-hbase/metron-hbase-common/pom.xml
@@ -52,6 +52,11 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
+ <exclusion>
+ <!-- to avoid pulling in guava 12 -->
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -89,6 +94,11 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
+ <exclusion>
+ <!-- to avoid pulling in guava 11 -->
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -97,6 +107,13 @@
<classifier>tests</classifier>
<version>${global_hadoop_version}</version>
<scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <!-- to avoid pulling in guava 11 -->
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/HBaseUtils.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/HBaseUtils.java
new file mode 100644
index 0000000..1f83349
--- /dev/null
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/HBaseUtils.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.hbase;
+
+import org.apache.hadoop.hbase.Cell;
+
+import java.util.Arrays;
+
+public class HBaseUtils {
+
+ /**
+ * Retrieves the column qualifier from a cell.
+ * @param cell An HBase cell.
+ * @return The qualifier contained within the cell.
+ */
+ public static byte[] getQualifier(Cell cell) {
+ int length = cell.getQualifierLength();
+ int offset = cell.getQualifierOffset();
+ byte[] bytes = Arrays.copyOfRange(cell.getRowArray(), offset, offset + length);
+ return bytes;
+ }
+
+ /**
+ * Retrieves the value from an HBase cell.
+ * @param cell An HBase cell.
+ * @return The value contained within the cell.
+ */
+ public static byte[] getValue(Cell cell) {
+ int length = cell.getValueLength();
+ int offset = cell.getValueOffset();
+ byte[] bytes = Arrays.copyOfRange(cell.getRowArray(), offset, offset + length);
+ return bytes;
+ }
+}
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClient.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClient.java
index 3fe1cfa..d5836d9 100644
--- a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClient.java
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClient.java
@@ -254,10 +254,10 @@ public class HBaseClient implements Closeable {
for (ColumnList.Column col : cols.getColumns()) {
if (col.getTs() > 0) {
- put.add(col.getFamily(), col.getQualifier(), col.getTs(), col.getValue());
+ put.addColumn(col.getFamily(), col.getQualifier(), col.getTs(), col.getValue());
} else {
- put.add(col.getFamily(), col.getQualifier(), col.getValue());
+ put.addColumn(col.getFamily(), col.getQualifier(), col.getValue());
}
}
}
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
index cb58b57..b2c0cef 100644
--- a/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
@@ -83,7 +84,7 @@ public class HBaseClientTest {
util.startMiniCluster();
admin = util.getHBaseAdmin();
// create the table
- table = util.createTable(Bytes.toBytes(tableName), cf);
+ table = util.createTable(TableName.valueOf(tableName), cf);
util.waitTableEnabled(table.getName());
// setup the client
client = new HBaseClient((c,t) -> table, table.getConfiguration(), tableName);
@@ -91,7 +92,7 @@ public class HBaseClientTest {
@AfterClass
public static void stopHBase() throws Exception {
- util.deleteTable(tableName);
+ util.deleteTable(TableName.valueOf(tableName));
util.shutdownMiniCluster();
util.cleanupTestDir();
}
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/mock/MockHBaseTableProvider.java b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/mock/MockHBaseTableProvider.java
index 3d4316d..4d7d52b 100644
--- a/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/mock/MockHBaseTableProvider.java
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/mock/MockHBaseTableProvider.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table;
import org.apache.metron.hbase.TableProvider;
@@ -37,7 +38,7 @@ public class MockHBaseTableProvider implements Serializable, TableProvider {
}
public static Table addToCache(String tableName, String... columnFamilies) {
- MockHTable ret = new MockHTable(tableName, columnFamilies);
+ MockHTable ret = new MockHTable(TableName.valueOf(tableName), columnFamilies);
_cache.put(tableName, ret);
return ret;
}
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/mock/MockHTable.java b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/mock/MockHTable.java
index 521f055..7ca85a4 100644
--- a/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/mock/MockHTable.java
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/mock/MockHTable.java
@@ -17,25 +17,16 @@
*/
package org.apache.metron.hbase.mock;
-
import com.google.common.collect.ImmutableList;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.NavigableSet;
-import java.util.NoSuchElementException;
-import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
@@ -44,6 +35,7 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -51,644 +43,749 @@ import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.util.Bytes;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
+
/**
* MockHTable.
*
- * This implementation is a selected excerpt from https://gist.github.com/agaoglu/613217
+ * This implementation is a selected excerpt from https://gist.github.com/agaoglu/613217 and
+ * https://github.com/rayokota/hgraphdb/blob/07c551f39a92b7ee2c8b48edcc7c0b314f6c3e33/src/main/java/org/apache/hadoop/hbase/client/mock/MockHTable.java.
*/
public class MockHTable implements Table {
+ private final TableName tableName;
+ private final List<String> columnFamilies = new ArrayList<>();
+ private Configuration config;
+ private List<Put> putLog;
+ private final NavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>> data
+ = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+
+ private static List<Cell> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, int maxVersions) {
+ return toKeyValue(row, rowdata, 0, Long.MAX_VALUE, maxVersions);
+ }
+
+ private static List<Cell> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, long timestampStart, long timestampEnd, int maxVersions) {
+ List<Cell> ret = new ArrayList<>();
+ for (byte[] family : rowdata.keySet()) {
+ for (byte[] qualifier : rowdata.get(family).keySet()) {
+ int versionsAdded = 0;
+ for (Map.Entry<Long, byte[]> tsToVal : rowdata.get(family).get(qualifier).descendingMap().entrySet()) {
+ if (versionsAdded++ == maxVersions)
+ break;
+ Long timestamp = tsToVal.getKey();
+ if (timestamp < timestampStart)
+ continue;
+ if (timestamp > timestampEnd)
+ continue;
+ byte[] value = tsToVal.getValue();
+ ret.add(new KeyValue(row, family, qualifier, timestamp, value));
+ }
+ }
+ }
+ return ret;
+ }
+
+ @SuppressWarnings("WeakerAccess")
+ public MockHTable(TableName tableName) {
+ this.tableName = tableName;
+ this.putLog = new ArrayList<>();
+ }
+
+ public MockHTable(TableName tableName, String... columnFamilies) {
+ this.tableName = tableName;
+ this.columnFamilies.addAll(Arrays.asList(columnFamilies));
+ this.putLog = new ArrayList<>();
+ }
+
+ @SuppressWarnings("WeakerAccess")
+ public MockHTable(TableName tableName, List<String> columnFamilies) {
+ this.tableName = tableName;
+ this.columnFamilies.addAll(columnFamilies);
+ this.putLog = new ArrayList<>();
+ }
+
+ public int size() {
+ return data.size();
+ }
+
+ public byte[] getTableName() {
+ return getName().getName();
+ }
+
+ @Override
+ public TableName getName() {
+ return tableName;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return config;
+ }
+
+ public MockHTable setConfiguration(Configuration config) {
+ this.config = config;
+ return this;
+ }
+
+ @Override
+ public HTableDescriptor getTableDescriptor() throws IOException {
+ HTableDescriptor table = new HTableDescriptor(tableName);
+ for (String columnFamily : columnFamilies) {
+ table.addFamily(new HColumnDescriptor(columnFamily));
+ }
+ return table;
+ }
+
+ @Override
+ public TableDescriptor getDescriptor() throws IOException {
+ return getTableDescriptor();
+ }
+
+ @Override
+ public boolean exists(Get get) throws IOException {
+ Result result = get(get);
+ return result != null && !result.isEmpty();
+ }
- private final String tableName;
- private final List<String> columnFamilies = new ArrayList<>();
- private HColumnDescriptor[] descriptors;
- private final List<Put> putLog;
- private NavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>> data
- = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-
- private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, int maxVersions) {
- return toKeyValue(row, rowdata, 0, Long.MAX_VALUE, maxVersions);
- }
-
- private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, long timestampStart, long timestampEnd, int maxVersions) {
- List<KeyValue> ret = new ArrayList<KeyValue>();
- for (byte[] family : rowdata.keySet())
- for (byte[] qualifier : rowdata.get(family).keySet()) {
- int versionsAdded = 0;
- for (Map.Entry<Long, byte[]> tsToVal : rowdata.get(family).get(qualifier).descendingMap().entrySet()) {
- if (versionsAdded++ == maxVersions)
- break;
- Long timestamp = tsToVal.getKey();
- if (timestamp < timestampStart)
- continue;
- if (timestamp > timestampEnd)
- continue;
- byte[] value = tsToVal.getValue();
- ret.add(new KeyValue(row, family, qualifier, timestamp, value));
+ /**
+ * Test for the existence of columns in the table, as specified by the Gets.
+ *
+ * <p>This will return an array of booleans. Each value will be true if the related Get matches
+ * one or more keys, false if not.
+ *
+ * <p>This is a server-side call so it prevents any data from being transferred to
+ * the client.
+ *
+ * @param gets the Gets
+ * @return Array of boolean. True if the specified Get matches one or more keys, false if not.
+ * @throws IOException e
+ */
+ @Override
+ public boolean[] existsAll(List<Get> gets) throws IOException {
+ boolean[] ret = new boolean[gets.size()];
+ int i = 0;
+ for(boolean b : exists(gets)) {
+ ret[i++] = b;
+ }
+ return ret;
+ }
+
+ @Override
+ public boolean[] exists(List<Get> list) throws IOException {
+ boolean[] ret = new boolean[list.size()];
+ int i = 0;
+ for(Get g : list) {
+ ret[i++] = exists(g);
+ }
+ return ret;
+ }
+
+ @Override
+ public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException {
+ Object[] rows = batch(actions);
+ System.arraycopy(rows, 0, results, 0, rows.length);
+ }
+
+ public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException {
+ Object[] results = new Object[actions.size()]; // same size.
+ for (int i = 0; i < actions.size(); i++) {
+ Row r = actions.get(i);
+ if (r instanceof Delete) {
+ delete((Delete) r);
+ results[i] = new Result();
+ }
+ if (r instanceof Put) {
+ put((Put) r);
+ results[i] = new Result();
+ }
+ if (r instanceof Get) {
+ Result result = get((Get) r);
+ results[i] = result;
+ }
+ if (r instanceof Increment) {
+ Result result = increment((Increment) r);
+ results[i] = result;
+ }
+ if (r instanceof Append) {
+ Result result = append((Append) r);
+ results[i] = result;
+ }
+ }
+ return results;
+ }
+
+ @Deprecated
+ @Override
+ public <R> void batchCallback(final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback) throws IOException, InterruptedException {
+ throw new RuntimeException(this.getClass() + " does NOT implement this method.");
+ }
+
+ @Override
+ public Result get(Get get) throws IOException {
+ if (!data.containsKey(get.getRow()))
+ return new Result();
+ byte[] row = get.getRow();
+ List<Cell> kvs = new ArrayList<>();
+ Filter filter = get.getFilter();
+ int maxResults = get.getMaxResultsPerColumnFamily();
+
+ if (!get.hasFamilies()) {
+ kvs = toKeyValue(row, data.get(row), get.getMaxVersions());
+ if (filter != null) {
+ kvs = filter(filter, kvs);
+ }
+ if (maxResults >= 0 && kvs.size() > maxResults) {
+ kvs = kvs.subList(0, maxResults);
+ }
+ } else {
+ for (byte[] family : get.getFamilyMap().keySet()) {
+ if (data.get(row).get(family) == null)
+ continue;
+ NavigableSet<byte[]> qualifiers = get.getFamilyMap().get(family);
+ if (qualifiers == null || qualifiers.isEmpty())
+ qualifiers = data.get(row).get(family).navigableKeySet();
+ List<Cell> familyKvs = new ArrayList<>();
+ for (byte[] qualifier : qualifiers) {
+ if (qualifier == null)
+ qualifier = "".getBytes(StandardCharsets.UTF_8);
+ if (!data.get(row).containsKey(family) ||
+ !data.get(row).get(family).containsKey(qualifier) ||
+ data.get(row).get(family).get(qualifier).isEmpty())
+ continue;
+ Map.Entry<Long, byte[]> timestampAndValue = data.get(row).get(family).get(qualifier).lastEntry();
+ familyKvs.add(new KeyValue(row, family, qualifier, timestampAndValue.getKey(), timestampAndValue.getValue()));
+ }
+ if (filter != null) {
+ familyKvs = filter(filter, familyKvs);
+ }
+ if (maxResults >= 0 && familyKvs.size() > maxResults) {
+ familyKvs = familyKvs.subList(0, maxResults);
+ }
+ kvs.addAll(familyKvs);
+ }
+ }
+ return Result.create(kvs);
+ }
+
+ @Override
+ public Result[] get(List<Get> gets) throws IOException {
+ List<Result> results = new ArrayList<>();
+ for (Get g : gets) {
+ results.add(get(g));
+ }
+ return results.toArray(new Result[results.size()]);
+ }
+
+ @Override
+ public ResultScanner getScanner(Scan scan) throws IOException {
+ final List<Result> ret = new ArrayList<>();
+ byte[] st = scan.getStartRow();
+ byte[] sp = scan.getStopRow();
+ Filter filter = scan.getFilter();
+ int maxResults = scan.getMaxResultsPerColumnFamily();
+
+ Set<byte[]> dataKeySet = scan.isReversed() ? data.descendingKeySet() : data.keySet();
+ for (byte[] row : dataKeySet) {
+ // if row is equal to startRow emit it. When startRow (inclusive) and
+ // stopRow (exclusive) is the same, it should not be excluded which would
+ // happen w/o this control.
+ if (st != null && st.length > 0 &&
+ Bytes.BYTES_COMPARATOR.compare(st, row) != 0) {
+ if (scan.isReversed()) {
+ // if row is before startRow do not emit, pass to next row
+ //noinspection ConstantConditions
+ if (st != null && st.length > 0 &&
+ Bytes.BYTES_COMPARATOR.compare(st, row) <= 0)
+ continue;
+ // if row is equal to stopRow or after it do not emit, stop iteration
+ if (sp != null && sp.length > 0 &&
+ Bytes.BYTES_COMPARATOR.compare(sp, row) > 0)
+ break;
+ } else {
+ // if row is before startRow do not emit, pass to next row
+ //noinspection ConstantConditions
+ if (st != null && st.length > 0 &&
+ Bytes.BYTES_COMPARATOR.compare(st, row) > 0)
+ continue;
+ // if row is equal to stopRow or after it do not emit, stop iteration
+ if (sp != null && sp.length > 0 &&
+ Bytes.BYTES_COMPARATOR.compare(sp, row) <= 0)
+ break;
+ }
+ }
+
+ List<Cell> kvs;
+ if (!scan.hasFamilies()) {
+ kvs = toKeyValue(row, data.get(row), scan.getTimeRange().getMin(), scan.getTimeRange().getMax(), scan.getMaxVersions());
+ if (filter != null) {
+ kvs = filter(filter, kvs);
+ }
+ if (maxResults >= 0 && kvs.size() > maxResults) {
+ kvs = kvs.subList(0, maxResults);
+ }
+ } else {
+ kvs = new ArrayList<>();
+ for (byte[] family : scan.getFamilyMap().keySet()) {
+ if (data.get(row).get(family) == null)
+ continue;
+ NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(family);
+ if (qualifiers == null || qualifiers.isEmpty())
+ qualifiers = data.get(row).get(family).navigableKeySet();
+ List<Cell> familyKvs = new ArrayList<>();
+ for (byte[] qualifier : qualifiers) {
+ if (data.get(row).get(family).get(qualifier) == null)
+ continue;
+ List<KeyValue> tsKvs = new ArrayList<>();
+ for (Long timestamp : data.get(row).get(family).get(qualifier).descendingKeySet()) {
+ if (timestamp < scan.getTimeRange().getMin())
+ continue;
+ if (timestamp > scan.getTimeRange().getMax())
+ continue;
+ byte[] value = data.get(row).get(family).get(qualifier).get(timestamp);
+ tsKvs.add(new KeyValue(row, family, qualifier, timestamp, value));
+ if (tsKvs.size() == scan.getMaxVersions()) {
+ break;
+ }
+ }
+ familyKvs.addAll(tsKvs);
+ }
+ if (filter != null) {
+ familyKvs = filter(filter, familyKvs);
+ }
+ if (maxResults >= 0 && familyKvs.size() > maxResults) {
+ familyKvs = familyKvs.subList(0, maxResults);
+ }
+ kvs.addAll(familyKvs);
+ }
+ }
+ if (!kvs.isEmpty()) {
+ ret.add(Result.create(kvs));
+ }
+ // Check for early out optimization
+ if (filter != null && filter.filterAllRemaining()) {
+ break;
+ }
+ }
+
+ return new ResultScanner() {
+ private final Iterator<Result> iterator = ret.iterator();
+
+ @Override
+ public Iterator<Result> iterator() {
+ return iterator;
+ }
+
+ @Override
+ public Result[] next(int nbRows) throws IOException {
+ ArrayList<Result> resultSets = new ArrayList<>(nbRows);
+ for (int i = 0; i < nbRows; i++) {
+ Result next = next();
+ if (next != null) {
+ resultSets.add(next);
+ } else {
+ break;
+ }
+ }
+ return resultSets.toArray(new Result[resultSets.size()]);
+ }
+
+ @Override
+ public Result next() throws IOException {
+ try {
+ return iterator().next();
+ } catch (NoSuchElementException e) {
+ return null;
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+
+ public ScanMetrics getScanMetrics() {
+ return null;
+ }
+
+ public boolean renewLease() {
+ return false;
+ }
+ };
+ }
+
+ @Override
+ public ResultScanner getScanner(byte[] family) throws IOException {
+ Scan scan = new Scan();
+ scan.addFamily(family);
+ return getScanner(scan);
+ }
+
+ @Override
+ public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
+ Scan scan = new Scan();
+ scan.addColumn(family, qualifier);
+ return getScanner(scan);
+ }
+
+ public List<Put> getPutLog() {
+ synchronized (putLog) {
+ return ImmutableList.copyOf(putLog);
+ }
+ }
+
+ public void addToPutLog(Put put) {
+ synchronized(putLog) {
+ putLog.add(put);
+ }
+ }
+
+ public void clear() {
+ synchronized (putLog) {
+ putLog.clear();
+ }
+ data.clear();
+ }
+
+ @Override
+ public void put(Put put) throws IOException {
+ addToPutLog(put);
+
+ byte[] row = put.getRow();
+ NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowData = forceFind(data, row, new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
+ for (byte[] family : put.getFamilyCellMap().keySet()) {
+ if (!columnFamilies.contains(new String(family, StandardCharsets.UTF_8))) {
+ throw new RuntimeException("Not Exists columnFamily : " + new String(family, StandardCharsets.UTF_8));
+ }
+ NavigableMap<byte[], NavigableMap<Long, byte[]>> familyData = forceFind(rowData, family, new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
+ for (Cell kv : put.getFamilyCellMap().get(family)) {
+ long ts = put.getTimeStamp();
+ if (ts == HConstants.LATEST_TIMESTAMP) ts = System.currentTimeMillis();
+ CellUtil.updateLatestStamp(kv, ts);
+ byte[] qualifier = CellUtil.cloneQualifier(kv);
+ NavigableMap<Long, byte[]> qualifierData = forceFind(familyData, qualifier, new ConcurrentSkipListMap<>());
+ qualifierData.put(kv.getTimestamp(), CellUtil.cloneValue(kv));
+ }
+ }
+ }
+
+ /**
+ * Helper method to find a key in a map. If key is not found, newObject is
+ * added to map and returned
+ *
+ * @param map
+ * map to extract value from
+ * @param key
+ * key to look for
+ * @param newObject
+ * set key to this if not found
+ * @return found value or newObject if not found
+ */
+ private <K, V> V forceFind(NavigableMap<K, V> map, K key, V newObject) {
+ V data = map.putIfAbsent(key, newObject);
+ if (data == null) {
+ data = newObject;
+ }
+ return data;
+ }
+
+ @Override
+ public void put(List<Put> puts) throws IOException {
+ for (Put put : puts) {
+ put(put);
+ }
+ }
+
+ @Override
+ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException {
+ return checkAndPut(row, family, qualifier, CompareFilter.CompareOp.EQUAL, value, put);
+ }
+
+ /**
+ * Atomically checks if a row/family/qualifier value matches the expected
+ * value. If it does, it adds the put. If the passed value is null, the check
+ * is for the lack of column (ie: non-existance)
+ *
+ * @param row to check
+ * @param family column family to check
+ * @param qualifier column qualifier to check
+ * @param compareOp comparison operator to use
+ * @param value the expected value
+ * @param put data to put if check succeeds
+ * @return true if the new put was executed, false otherwise
+ * @throws IOException e
+ */
+ @Override
+ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException {
+ if (check(row, family, qualifier, compareOp, value)) {
+ put(put);
+ return true;
}
- }
- return ret;
- }
- public MockHTable(String tableName) {
- this.tableName = tableName;
- this.putLog = new ArrayList<>();
- }
-
- public MockHTable(String tableName, String... columnFamilies) {
- this(tableName);
- for(String cf : columnFamilies) {
- addColumnFamily(cf);
- }
- }
-
- public int size() {
- return data.size();
- }
-
- public void addColumnFamily(String columnFamily) {
- this.columnFamilies.add(columnFamily);
- descriptors = new HColumnDescriptor[columnFamilies.size()];
- int i = 0;
- for(String cf : columnFamilies) {
- descriptors[i++] = new HColumnDescriptor(cf);
- }
- }
-
- public byte[] getTableName() {
- return Bytes.toBytes(tableName);
- }
-
- @Override
- public TableName getName() {
- return TableName.valueOf(tableName);
- }
-
- @Override
- public Configuration getConfiguration() {
- return HBaseConfiguration.create();
- }
-
- @Override
- public HTableDescriptor getTableDescriptor() throws IOException {
- HTableDescriptor ret = new HTableDescriptor(tableName);
- for(HColumnDescriptor c : descriptors) {
- ret.addFamily(c);
- }
- return ret;
- }
-
- @Override
- public boolean exists(Get get) throws IOException {
- if(get.getFamilyMap() == null || get.getFamilyMap().size() == 0) {
- return data.containsKey(get.getRow());
- } else {
- byte[] row = get.getRow();
- if(!data.containsKey(row)) {
return false;
- }
- for(byte[] family : get.getFamilyMap().keySet()) {
- if(!data.get(row).containsKey(family)) {
- return false;
+ }
+
+ @Override
+ public void delete(Delete delete) throws IOException {
+ byte[] row = delete.getRow();
+ if (data.containsKey(row)) {
+ data.remove(row);
} else {
- return true;
+ throw new IOException("Nothing to delete");
}
- }
- return true;
- }
- }
-
- /**
- * Test for the existence of columns in the table, as specified by the Gets.
- *
- * <p>This will return an array of booleans. Each value will be true if the related Get matches
- * one or more keys, false if not.
- *
- * <p>This is a server-side call so it prevents any data from being transferred to
- * the client.
- *
- * @param gets the Gets
- * @return Array of boolean. True if the specified Get matches one or more keys, false if not.
- * @throws IOException e
- */
- @Override
- public boolean[] existsAll(List<Get> gets) throws IOException {
- boolean[] ret = new boolean[gets.size()];
- int i = 0;
- for(boolean b : exists(gets)) {
- ret[i++] = b;
- }
- return ret;
- }
-
- public Boolean[] exists(List<Get> list) throws IOException {
- Boolean[] ret = new Boolean[list.size()];
- int i = 0;
- for(Get g : list) {
- ret[i++] = exists(g);
- }
- return ret;
- }
-
- @Override
- public void batch(List<? extends Row> list, Object[] objects) throws IOException, InterruptedException {
- Object[] results = batch(list);
- System.arraycopy(results, 0, objects, 0, results.length);
- }
-
- /**
- * @param actions
- * @deprecated
- */
- @Deprecated
- @Override
- public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException {
- List<Result> results = new ArrayList<Result>();
- for (Row r : actions) {
- if (r instanceof Delete) {
- delete((Delete) r);
- continue;
- }
- if (r instanceof Put) {
- put((Put) r);
- continue;
- }
- if (r instanceof Get) {
- results.add(get((Get) r));
- }
- }
- return results.toArray();
- }
-
- @Override
- public <R> void batchCallback(List<? extends Row> list, Object[] objects, Batch.Callback<R> callback) throws IOException, InterruptedException {
- throw new UnsupportedOperationException();
-
- }
-
- /**
- * @param list
- * @param callback
- * @deprecated
- */
- @Deprecated
- @Override
- public <R> Object[] batchCallback(List<? extends Row> list, Batch.Callback<R> callback) throws IOException, InterruptedException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Result get(Get get) throws IOException {
- if (!data.containsKey(get.getRow()))
- return new Result();
- byte[] row = get.getRow();
- List<KeyValue> kvs = new ArrayList<KeyValue>();
- if (!get.hasFamilies()) {
- kvs = toKeyValue(row, data.get(row), get.getMaxVersions());
- } else {
- for (byte[] family : get.getFamilyMap().keySet()){
- if (data.get(row).get(family) == null)
- continue;
- NavigableSet<byte[]> qualifiers = get.getFamilyMap().get(family);
- if (qualifiers == null || qualifiers.isEmpty())
- qualifiers = data.get(row).get(family).navigableKeySet();
- for (byte[] qualifier : qualifiers){
- if (qualifier == null)
- qualifier = "".getBytes(StandardCharsets.UTF_8);
- if (!data.get(row).containsKey(family) ||
- !data.get(row).get(family).containsKey(qualifier) ||
- data.get(row).get(family).get(qualifier).isEmpty())
- continue;
- Map.Entry<Long, byte[]> timestampAndValue = data.get(row).get(family).get(qualifier).lastEntry();
- kvs.add(new KeyValue(row,family, qualifier, timestampAndValue.getKey(), timestampAndValue.getValue()));
+ }
+
+ @Override
+ public void delete(List<Delete> deletes) throws IOException {
+ for (Delete delete : deletes) {
+ delete(delete);
}
- }
- }
- Filter filter = get.getFilter();
- if (filter != null) {
- filter.reset();
- List<KeyValue> nkvs = new ArrayList<KeyValue>(kvs.size());
- for (KeyValue kv : kvs) {
- if (filter.filterAllRemaining()) {
- break;
+ }
+
+ @Override
+ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete) throws IOException {
+ return checkAndDelete(row, family, qualifier, CompareFilter.CompareOp.EQUAL, value, delete);
+ }
+
+ /**
+ * Atomically checks if a row/family/qualifier value matches the expected
+ * value. If it does, it adds the delete. If the passed value is null, the
+ * check is for the lack of column (ie: non-existance)
+ *
+ * @param row to check
+ * @param family column family to check
+ * @param qualifier column qualifier to check
+ * @param compareOp comparison operator to use
+ * @param value the expected value
+ * @param delete data to delete if check succeeds
+ * @return true if the new delete was executed, false otherwise
+ * @throws IOException e
+ */
+ @Override
+ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Delete delete) throws IOException {
+ if (check(row, family, qualifier, compareOp, value)) {
+ delete(delete);
+ return true;
}
- if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) {
- continue;
+ return false;
+ }
+
+ @Override
+ public void mutateRow(RowMutations rm) throws IOException {
+ // currently only support Put and Delete
+ long maxTs = System.currentTimeMillis();
+ for (Mutation mutation : rm.getMutations()) {
+ if (mutation instanceof Put) {
+ put((Put) mutation);
+ } else if (mutation instanceof Delete) {
+ delete((Delete) mutation);
+ }
+ long ts = mutation.getTimeStamp();
+ if (ts != HConstants.LATEST_TIMESTAMP && ts > maxTs) maxTs = ts;
}
- if (filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE) {
- nkvs.add(kv);
+ long now = System.currentTimeMillis();
+ if (now <= maxTs) {
+ // we have intentionally set the ts in the future, so wait
+ try {
+ Thread.sleep(maxTs - now + 1);
+ } catch (InterruptedException ignored) { }
}
- // ignoring next key hint which is a optimization to reduce file system IO
- }
- if (filter.hasFilterRow()) {
- filter.filterRow();
- }
- kvs = nkvs;
- }
-
- return new Result(kvs);
- }
-
- @Override
- public Result[] get(List<Get> list) throws IOException {
- Result[] ret = new Result[list.size()];
- int i = 0;
- for(Get g : list) {
- ret[i++] = get(g);
- }
- return ret;
- }
-
- /**
- * @param bytes
- * @param bytes1
- * @deprecated
- */
- @Deprecated
- public Result getRowOrBefore(byte[] bytes, byte[] bytes1) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ResultScanner getScanner(Scan scan) throws IOException {
- final List<Result> ret = new ArrayList<Result>();
- byte[] st = scan.getStartRow();
- byte[] sp = scan.getStopRow();
- Filter filter = scan.getFilter();
-
- for (byte[] row : data.keySet()){
- // if row is equal to startRow emit it. When startRow (inclusive) and
- // stopRow (exclusive) is the same, it should not be excluded which would
- // happen w/o this control.
- if (st != null && st.length > 0 &&
- Bytes.BYTES_COMPARATOR.compare(st, row) != 0) {
- // if row is before startRow do not emit, pass to next row
- if (st != null && st.length > 0 &&
- Bytes.BYTES_COMPARATOR.compare(st, row) > 0)
- continue;
- // if row is equal to stopRow or after it do not emit, stop iteration
- if (sp != null && sp.length > 0 &&
- Bytes.BYTES_COMPARATOR.compare(sp, row) <= 0)
- break;
- }
-
- List<KeyValue> kvs = null;
- if (!scan.hasFamilies()) {
- kvs = toKeyValue(row, data.get(row), scan.getTimeRange().getMin(), scan.getTimeRange().getMax(), scan.getMaxVersions());
- } else {
- kvs = new ArrayList<KeyValue>();
- for (byte[] family : scan.getFamilyMap().keySet()){
- if (data.get(row).get(family) == null)
- continue;
- NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(family);
- if (qualifiers == null || qualifiers.isEmpty())
- qualifiers = data.get(row).get(family).navigableKeySet();
- for (byte[] qualifier : qualifiers){
- if (data.get(row).get(family).get(qualifier) == null)
- continue;
- for (Long timestamp : data.get(row).get(family).get(qualifier).descendingKeySet()){
- if (timestamp < scan.getTimeRange().getMin())
- continue;
- if (timestamp > scan.getTimeRange().getMax())
- continue;
- byte[] value = data.get(row).get(family).get(qualifier).get(timestamp);
- kvs.add(new KeyValue(row, family, qualifier, timestamp, value));
- if(kvs.size() == scan.getMaxVersions()) {
+ }
+
+ @Override
+ public Result append(Append append) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Result increment(Increment increment) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Follows the logical flow through the filter methods for a single row.
+ *
+ * @param filter HBase filter.
+ * @param kvs List of a row's KeyValues
+ * @return List of KeyValues that were not filtered.
+ */
+ private List<Cell> filter(Filter filter, List<Cell> kvs) throws IOException {
+ filter.reset();
+
+ List<Cell> tmp = new ArrayList<>(kvs.size());
+ tmp.addAll(kvs);
+
+ /*
+ * Note. Filter flow for a single row. Adapted from
+ * "HBase: The Definitive Guide" (p. 163) by Lars George, 2011.
+ * See Figure 4-2 on p. 163.
+ */
+ boolean filteredOnRowKey = false;
+ List<Cell> nkvs = new ArrayList<>(tmp.size());
+ for (Cell kv : tmp) {
+ if (filter.filterRowKey(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())) {
+ filteredOnRowKey = true;
+ break;
+ }
+ Filter.ReturnCode filterResult = filter.filterKeyValue(kv);
+ if (filterResult == Filter.ReturnCode.INCLUDE || filterResult == Filter.ReturnCode.INCLUDE_AND_NEXT_COL) {
+ nkvs.add(filter.transformCell(kv));
+ } else if (filterResult == Filter.ReturnCode.NEXT_ROW) {
break;
- }
+ } else if (filterResult == Filter.ReturnCode.NEXT_COL || filterResult == Filter.ReturnCode.SKIP) {
+ //noinspection UnnecessaryContinue
+ continue;
}
- }
+ /*
+ * Ignoring next key hint which is a optimization to reduce file
+ * system IO
+ */
}
- }
- if (filter != null) {
- filter.reset();
- List<KeyValue> nkvs = new ArrayList<KeyValue>(kvs.size());
- for (KeyValue kv : kvs) {
- if (filter.filterAllRemaining()) {
- break;
- }
- if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) {
- continue;
- }
- Filter.ReturnCode filterResult = filter.filterKeyValue(kv);
- if (filterResult == Filter.ReturnCode.INCLUDE) {
- nkvs.add(kv);
- } else if (filterResult == Filter.ReturnCode.NEXT_ROW) {
- break;
- }
- // ignoring next key hint which is a optimization to reduce file system IO
+ if (filter.hasFilterRow() && !filteredOnRowKey) {
+ filter.filterRowCells(nkvs);
}
- if (filter.hasFilterRow()) {
- filter.filterRow();
+ if (filter.filterRow() || filteredOnRowKey) {
+ nkvs.clear();
}
- kvs = nkvs;
- }
- if (!kvs.isEmpty()) {
- ret.add(new Result(kvs));
- }
- }
-
- return new ResultScanner() {
- private final Iterator<Result> iterator = ret.iterator();
- @Override
- public Iterator<Result> iterator() {
- return iterator;
- }
- @Override
- public Result[] next(int nbRows) throws IOException {
- ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
- for(int i = 0; i < nbRows; i++) {
- Result next = next();
- if (next != null) {
- resultSets.add(next);
- } else {
- break;
- }
+ tmp = nkvs;
+ return tmp;
+ }
+
+ private boolean check(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value) {
+ if (value == null)
+ return !data.containsKey(row) ||
+ !data.get(row).containsKey(family) ||
+ !data.get(row).get(family).containsKey(qualifier);
+ else if (data.containsKey(row) &&
+ data.get(row).containsKey(family) &&
+ data.get(row).get(family).containsKey(qualifier) &&
+ !data.get(row).get(family).get(qualifier).isEmpty()) {
+
+ byte[] oldValue = data.get(row).get(family).get(qualifier).lastEntry().getValue();
+ int compareResult = Bytes.compareTo(value, oldValue);
+ switch (compareOp) {
+ case LESS:
+ return compareResult < 0;
+ case LESS_OR_EQUAL:
+ return compareResult <= 0;
+ case EQUAL:
+ return compareResult == 0;
+ case NOT_EQUAL:
+ return compareResult != 0;
+ case GREATER_OR_EQUAL:
+ return compareResult >= 0;
+ case GREATER:
+ return compareResult > 0;
+ default:
+ throw new RuntimeException("Unknown Compare op " + compareOp.name());
+ }
+ } else {
+ return false;
}
- return resultSets.toArray(new Result[resultSets.size()]);
- }
- @Override
- public Result next() throws IOException {
- try {
- return iterator().next();
- } catch (NoSuchElementException e) {
- return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public CoprocessorRpcChannel coprocessorService(byte[] row) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T extends Service, R> Map<byte[], R> coprocessorService(final Class<T> service, byte[] startKey, byte[] endKey, final Batch.Call<T, R> callable) throws ServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T extends Service, R> void coprocessorService(final Class<T> service, byte[] startKey, byte[] endKey, final Batch.Call<T, R> callable, final Batch.Callback<R> callback) throws ServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setOperationTimeout(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getOperationTimeout() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setRpcTimeout(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getReadRpcTimeout() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setReadRpcTimeout(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getWriteRpcTimeout() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setWriteRpcTimeout(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getRpcTimeout() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <R extends Message> Map<byte[], R> batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype, Batch.Callback<R> callback) throws ServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations rm) throws IOException {
+ if (check(row, family, qualifier, compareOp, value)) {
+ mutateRow(rm);
+ return true;
}
- }
- @Override
- public void close() {}
- };
- }
- @Override
- public ResultScanner getScanner(byte[] family) throws IOException {
- Scan scan = new Scan();
- scan.addFamily(family);
- return getScanner(scan);
- }
-
- @Override
- public ResultScanner getScanner(byte[] family, byte[] qualifier)
- throws IOException {
- Scan scan = new Scan();
- scan.addColumn(family, qualifier);
- return getScanner(scan);
- }
-
- public List<Put> getPutLog() {
- synchronized (putLog) {
- return ImmutableList.copyOf(putLog);
- }
- }
-
- public void addToPutLog(Put put) {
- synchronized(putLog) {
- putLog.add(put);
- }
- }
-
- public void clear() {
- synchronized (putLog) {
- putLog.clear();
- }
- data.clear();
- }
-
- @Override
- public void put(Put put) throws IOException {
- addToPutLog(put);
-
- byte[] row = put.getRow();
- NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowData = forceFind(data, row, new TreeMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>(Bytes.BYTES_COMPARATOR));
- for (byte[] family : put.getFamilyMap().keySet()){
- NavigableMap<byte[], NavigableMap<Long, byte[]>> familyData = forceFind(rowData, family, new TreeMap<byte[], NavigableMap<Long, byte[]>>(Bytes.BYTES_COMPARATOR));
- for (KeyValue kv : put.getFamilyMap().get(family)){
- kv.updateLatestStamp(Bytes.toBytes(System.currentTimeMillis()));
- byte[] qualifier = kv.getQualifier();
- NavigableMap<Long, byte[]> qualifierData = forceFind(familyData, qualifier, new TreeMap<Long, byte[]>());
- qualifierData.put(kv.getTimestamp(), kv.getValue());
- }
- }
- }
-
- /**
- * Helper method to find a key in a map. If key is not found, newObject is
- * added to map and returned
- *
- * @param map
- * map to extract value from
- * @param key
- * key to look for
- * @param newObject
- * set key to this if not found
- * @return found value or newObject if not found
- */
- private <K, V> V forceFind(NavigableMap<K, V> map, K key, V newObject){
- V data = map.get(key);
- if (data == null){
- data = newObject;
- map.put(key, data);
- }
- return data;
- }
-
- @Override
- public void put(List<Put> puts) throws IOException {
- for (Put put : puts)
- put(put);
- }
-
- @Override
- public boolean checkAndPut(byte[] bytes, byte[] bytes1, byte[] bytes2, byte[] bytes3, Put put) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Atomically checks if a row/family/qualifier value matches the expected
- * value. If it does, it adds the put. If the passed value is null, the check
- * is for the lack of column (ie: non-existance)
- *
- * @param row to check
- * @param family column family to check
- * @param qualifier column qualifier to check
- * @param compareOp comparison operator to use
- * @param value the expected value
- * @param put data to put if check succeeds
- * @return true if the new put was executed, false otherwise
- * @throws IOException e
- */
- @Override
- public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException {
- return false;
- }
-
- @Override
- public void delete(Delete delete) throws IOException {
- byte[] row = delete.getRow();
- if (data.containsKey(row)) {
- data.remove(row);
- } else {
- throw new IOException();
- }
- }
-
- @Override
- public void delete(List<Delete> list) throws IOException {
- throw new UnsupportedOperationException();
-
- }
-
- @Override
- public boolean checkAndDelete(byte[] bytes, byte[] bytes1, byte[] bytes2, byte[] bytes3, Delete delete) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Atomically checks if a row/family/qualifier value matches the expected
- * value. If it does, it adds the delete. If the passed value is null, the
- * check is for the lack of column (ie: non-existance)
- *
- * @param row to check
- * @param family column family to check
- * @param qualifier column qualifier to check
- * @param compareOp comparison operator to use
- * @param value the expected value
- * @param delete data to delete if check succeeds
- * @return true if the new delete was executed, false otherwise
- * @throws IOException e
- */
- @Override
- public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Delete delete) throws IOException {
- return false;
- }
-
- @Override
- public void mutateRow(RowMutations rowMutations) throws IOException {
- throw new UnsupportedOperationException();
-
- }
-
- @Override
- public Result append(Append append) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Result increment(Increment increment) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, long l) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, long l, Durability durability) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- /**
- * @param bytes
- * @param bytes1
- * @param bytes2
- * @param l
- * @param b
- * @deprecated
- */
- @Deprecated
- public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, long l, boolean b) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- public boolean isAutoFlush() {
- return autoflush;
- }
-
- public void flushCommits() throws IOException {
-
- }
-
- @Override
- public void close() throws IOException {
-
- }
-
- @Override
- public CoprocessorRpcChannel coprocessorService(byte[] bytes) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> aClass, byte[] bytes, byte[] bytes1, Batch.Call<T, R> call) throws ServiceException, Throwable {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <T extends Service, R> void coprocessorService(Class<T> aClass, byte[] bytes, byte[] bytes1, Batch.Call<T, R> call, Batch.Callback<R> callback) throws ServiceException, Throwable {
- throw new UnsupportedOperationException();
- }
-
- boolean autoflush = true;
-
- /**
- * @param b
- * @deprecated
- */
- @Deprecated
- public void setAutoFlush(boolean b) {
- autoflush = b;
- }
-
- public void setAutoFlush(boolean b, boolean b1) {
- autoflush = b;
- }
-
- public void setAutoFlushTo(boolean b) {
- autoflush = b;
- }
-
- long writeBufferSize = 0;
- @Override
- public long getWriteBufferSize() {
- return writeBufferSize;
- }
-
- @Override
- public void setWriteBufferSize(long l) throws IOException {
- writeBufferSize = l;
- }
-
- @Override
- public <R extends Message> Map<byte[], R> batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message message, byte[] bytes, byte[] bytes1, R r) throws ServiceException, Throwable {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message message, byte[] bytes, byte[] bytes1, R r, Batch.Callback<R> callback) throws ServiceException, Throwable {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Atomically checks if a row/family/qualifier value matches the expected value.
- * If it does, it performs the row mutations. If the passed value is null, the check
- * is for the lack of column (ie: non-existence)
- *
- * @param row to check
- * @param family column family to check
- * @param qualifier column qualifier to check
- * @param compareOp the comparison operator
- * @param value the expected value
- * @param mutation mutations to perform if check succeeds
- * @return true if the new put was executed, false otherwise
- * @throws IOException e
- */
- @Override
- public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException {
- return false;
- }
+ return false;
+ }
}
diff --git a/metron-platform/metron-parsing/metron-parsers/pom.xml b/metron-platform/metron-parsing/metron-parsers/pom.xml
index 0f4bc31..57e4761 100644
--- a/metron-platform/metron-parsing/metron-parsers/pom.xml
+++ b/metron-platform/metron-parsing/metron-parsers/pom.xml
@@ -30,6 +30,15 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
+
+ <!-- Test dependencies needed preferentially -->
+ <dependency>
+ <groupId>com.github.fge</groupId>
+ <artifactId>jackson-coreutils</artifactId>
+ <version>1.8</version>
+ <scope>test</scope>
+ </dependency>
+
<!-- Metron Dependencies -->
<dependency>
<groupId>com.google.guava</groupId>
diff --git a/metron-stellar/stellar-common/pom.xml b/metron-stellar/stellar-common/pom.xml
index c98cb32..24d13c3 100644
--- a/metron-stellar/stellar-common/pom.xml
+++ b/metron-stellar/stellar-common/pom.xml
@@ -332,6 +332,13 @@
<shadedPattern>org.apache.metron.guava.${guava_version}</shadedPattern>
</relocation>
<relocation>
+ <!-- need to also relocate Guava's `thirdparty` packages. otherwise, the class
+ `thirdparty.publicsuffix.PublicSuffixPatterns` cannot be found after
+ relocation of Guava's `com.google.commmon` packages. -->
+ <pattern>com.google.thirdparty</pattern>
+ <shadedPattern>org.apache.metron.guava.thirdparty.${guava_version}</shadedPattern>
+ </relocation>
+ <relocation>
<pattern>org.apache.commons.beanutils</pattern>
<shadedPattern>org.apache.metron.stellar.beanutils</shadedPattern>
</relocation>
diff --git a/pom.xml b/pom.xml
index f952884..308c167 100644
--- a/pom.xml
+++ b/pom.xml
@@ -149,6 +149,8 @@
<global_storm_kafka_version>1.2.3</global_storm_kafka_version>
<global_zeppelin_version>0.8.0</global_zeppelin_version>
<global_solr_version>7.4.0</global_solr_version>
+ <global_hbase_version>2.0.2</global_hbase_version>
+ <global_hbase_guava_version>17.0</global_hbase_guava_version>
</properties>
</profile>
<profile>
@@ -163,6 +165,8 @@
<global_kafka_version>${base_kafka_version}.${hdp_version}-${build_number}</global_kafka_version>
<global_zeppelin_version>0.7.3</global_zeppelin_version>
<global_solr_version>6.6.2</global_solr_version>
+ <global_hbase_version>2.0.2</global_hbase_version>
+ <global_hbase_guava_version>17.0</global_hbase_guava_version>
</properties>
</profile>
</profiles>