You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2019/07/18 13:56:59 UTC
[metron] branch feature/METRON-2088-support-hdp-3.1 updated:
METRON-2175 Introduce HBase Connection Abstractions for HBase 2.0.2
(nickwallen) closes apache/metron#1456
This is an automated email from the ASF dual-hosted git repository.
nickallen pushed a commit to branch feature/METRON-2088-support-hdp-3.1
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/feature/METRON-2088-support-hdp-3.1 by this push:
new ef3a76c METRON-2175 Introduce HBase Connection Abstractions for HBase 2.0.2 (nickwallen) closes apache/metron#1456
ef3a76c is described below
commit ef3a76c799d48856d8e4999226ec9392f298a2f7
Author: nickwallen <ni...@nickallen.org>
AuthorDate: Thu Jul 18 09:56:31 2019 -0400
METRON-2175 Introduce HBase Connection Abstractions for HBase 2.0.2 (nickwallen) closes apache/metron#1456
---
.../metron/profiler/client/ProfileWriter.java | 6 +-
.../spark/function/HBaseWriterFunction.java | 4 +-
.../org/apache/metron/hbase/bolt/HBaseBolt.java | 8 +-
.../apache/metron/hbase/bolt/HBaseBoltTest.java | 6 +-
.../org/apache/metron/rest/config/HBaseConfig.java | 6 +-
.../impl/SensorEnrichmentConfigServiceImpl.java | 6 +-
.../org/apache/metron/rest/config/TestConfig.java | 6 +-
.../SensorEnrichmentConfigServiceImplTest.java | 6 +-
.../metron-elasticsearch-common/pom.xml | 18 ++
.../metron/hbase/coprocessor/HBaseCacheWriter.java | 4 +-
.../java/org/apache/metron/hbase/ColumnList.java | 97 ++++++-
.../metron/hbase/HBaseProjectionCriteria.java | 10 +-
.../apache/metron/hbase/client/HBaseClient.java | 317 ++++-----------------
.../metron/hbase/client/HBaseClientFactory.java | 70 +++++
.../hbase/client/HBaseConnectionFactory.java | 59 ++++
.../metron/hbase/client/HBaseTableClient.java | 285 ++++++++++++++++++
.../hbase/client/HBaseTableClientFactory.java | 54 ++++
.../metron/hbase/client/HBaseWriterParams.java | 51 ++++
.../{HBaseClient.java => LegacyHBaseClient.java} | 4 +-
.../metron/hbase/client/HBaseClientTest.java | 10 +-
.../HBaseTableClientIntegrationTest.java | 286 +++++++++++++++++++
metron-platform/metron-pcap/pom.xml | 5 +
pom.xml | 26 +-
23 files changed, 1039 insertions(+), 305 deletions(-)
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
index 4e00164..d4e5e66 100644
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.metron.hbase.HTableProvider;
import org.apache.metron.hbase.ColumnList;
-import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.hbase.client.LegacyHBaseClient;
import org.apache.metron.profiler.ProfileMeasurement;
import org.apache.metron.profiler.ProfilePeriod;
import org.apache.metron.profiler.hbase.ColumnBuilder;
@@ -47,13 +47,13 @@ public class ProfileWriter {
private RowKeyBuilder rowKeyBuilder;
private ColumnBuilder columnBuilder;
- private HBaseClient hbaseClient;
+ private LegacyHBaseClient hbaseClient;
private HBaseProfilerClient client;
public ProfileWriter(RowKeyBuilder rowKeyBuilder, ColumnBuilder columnBuilder, HTableInterface table, long periodDurationMillis) {
this.rowKeyBuilder = rowKeyBuilder;
this.columnBuilder = columnBuilder;
- this.hbaseClient = new HBaseClient((c, t) -> table, table.getConfiguration(), table.getName().getNameAsString());
+ this.hbaseClient = new LegacyHBaseClient((c, t) -> table, table.getConfiguration(), table.getName().getNameAsString());
this.client = new HBaseProfilerClient(table, rowKeyBuilder, columnBuilder, periodDurationMillis);
}
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java
index cfabd94..6a090cf 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.metron.hbase.HTableProvider;
import org.apache.metron.hbase.TableProvider;
-import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.hbase.client.LegacyHBaseClient;
import org.apache.metron.profiler.ProfileMeasurement;
import org.apache.metron.profiler.hbase.ColumnBuilder;
import org.apache.metron.profiler.hbase.RowKeyBuilder;
@@ -118,7 +118,7 @@ public class HBaseWriterFunction implements MapPartitionsFunction<ProfileMeasure
// open an HBase connection
Configuration config = HBaseConfiguration.create();
- try (HBaseClient client = new HBaseClient(tableProvider, config, tableName)) {
+ try (LegacyHBaseClient client = new LegacyHBaseClient(tableProvider, config, tableName)) {
for (ProfileMeasurementAdapter adapter : measurements) {
ProfileMeasurement m = adapter.toProfileMeasurement();
diff --git a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
index ec860a5..07bd552 100644
--- a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
+++ b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
@@ -31,7 +31,7 @@ import org.apache.metron.hbase.HTableProvider;
import org.apache.metron.hbase.TableProvider;
import org.apache.metron.hbase.ColumnList;
import org.apache.metron.hbase.bolt.mapper.HBaseMapper;
-import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.hbase.client.LegacyHBaseClient;
import org.apache.storm.Config;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
@@ -90,7 +90,7 @@ public class HBaseBolt extends BaseRichBolt {
private BatchHelper batchHelper;
protected OutputCollector collector;
- protected transient HBaseClient hbaseClient;
+ protected transient LegacyHBaseClient hbaseClient;
public HBaseBolt(String tableName, HBaseMapper mapper) {
this.tableName = tableName;
@@ -122,7 +122,7 @@ public class HBaseBolt extends BaseRichBolt {
return this;
}
- public void setClient(HBaseClient hbaseClient) {
+ public void setClient(LegacyHBaseClient hbaseClient) {
this.hbaseClient = hbaseClient;
}
@@ -147,7 +147,7 @@ public class HBaseBolt extends BaseRichBolt {
provider = this.tableProvider;
}
- hbaseClient = new HBaseClient(provider, HBaseConfiguration.create(), tableName);
+ hbaseClient = new LegacyHBaseClient(provider, HBaseConfiguration.create(), tableName);
}
@Override
diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java
index bae3728..9146aff 100644
--- a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java
+++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java
@@ -25,7 +25,7 @@ import org.apache.storm.Constants;
import org.apache.storm.tuple.Tuple;
import org.apache.metron.hbase.bolt.mapper.Widget;
import org.apache.metron.hbase.bolt.mapper.WidgetMapper;
-import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.hbase.client.LegacyHBaseClient;
import org.apache.metron.test.bolt.BaseBoltTest;
import org.junit.Assert;
import org.junit.Before;
@@ -48,7 +48,7 @@ import static org.mockito.Mockito.when;
public class HBaseBoltTest extends BaseBoltTest {
private static final String tableName = "widgets";
- private HBaseClient client;
+ private LegacyHBaseClient client;
private Tuple tuple1;
private Tuple tuple2;
private Widget widget1;
@@ -71,7 +71,7 @@ public class HBaseBoltTest extends BaseBoltTest {
public void setup() throws Exception {
tuple1 = mock(Tuple.class);
tuple2 = mock(Tuple.class);
- client = mock(HBaseClient.class);
+ client = mock(LegacyHBaseClient.class);
provider = mock(TableProvider.class);
}
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/HBaseConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/HBaseConfig.java
index 7ce16f9..c1476e8 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/HBaseConfig.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/HBaseConfig.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.metron.common.configuration.EnrichmentConfigurations;
import org.apache.metron.hbase.HTableProvider;
import org.apache.metron.hbase.TableProvider;
-import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.hbase.client.LegacyHBaseClient;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.service.GlobalConfigService;
import org.apache.metron.rest.user.UserSettingsClient;
@@ -60,7 +60,7 @@ public class HBaseConfig {
}
@Bean()
- public HBaseClient hBaseClient() {
+ public LegacyHBaseClient hBaseClient() {
Map<String, Object> restConfig = null;
try {
restConfig = globalConfigService.get();
@@ -75,7 +75,7 @@ public class HBaseConfig {
} catch (ClassNotFoundException | InstantiationException | InvocationTargetException | IllegalAccessException | NoSuchMethodException e) {
throw new IllegalStateException("Unable to create table provider", e);
}
- return new HBaseClient(provider, HBaseConfiguration.create(),
+ return new LegacyHBaseClient(provider, HBaseConfiguration.create(),
(String) restConfig.get(EnrichmentConfigurations.TABLE_NAME));
}
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java
index 5c0f2e0..d7f7b7f 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java
@@ -32,7 +32,7 @@ import org.apache.metron.common.configuration.ConfigurationsUtils;
import org.apache.metron.common.configuration.EnrichmentConfigurations;
import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
import org.apache.metron.common.zookeeper.ConfigurationsCache;
-import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.hbase.client.LegacyHBaseClient;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.service.SensorEnrichmentConfigService;
import org.apache.zookeeper.KeeperException;
@@ -48,12 +48,12 @@ public class SensorEnrichmentConfigServiceImpl implements SensorEnrichmentConfig
private ConfigurationsCache cache;
- private HBaseClient hBaseClient;
+ private LegacyHBaseClient hBaseClient;
@Autowired
public SensorEnrichmentConfigServiceImpl(final ObjectMapper objectMapper,
final CuratorFramework client, final ConfigurationsCache cache,
- final HBaseClient hBaseClient) {
+ final LegacyHBaseClient hBaseClient) {
this.objectMapper = objectMapper;
this.client = client;
this.cache = cache;
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
index d42f128..b3a478b 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
@@ -46,7 +46,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.metron.common.configuration.ConfigurationsUtils;
import org.apache.metron.common.zookeeper.ConfigurationsCache;
import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
-import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.hbase.client.LegacyHBaseClient;
import org.apache.metron.hbase.mock.MockHBaseTableProvider;
import org.apache.metron.integration.ComponentRunner;
import org.apache.metron.integration.UnableToStartException;
@@ -202,7 +202,7 @@ public class TestConfig {
}
@Bean()
- public HBaseClient hBaseClient() throws RestException, IOException {
+ public LegacyHBaseClient hBaseClient() throws RestException, IOException {
final String cf = "t";
final String cq = "v";
HTableInterface table = MockHBaseTableProvider.addToCache("enrichment_list", cf);
@@ -216,7 +216,7 @@ public class TestConfig {
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cq), "{}".getBytes(StandardCharsets.UTF_8));
table.put(put);
}
- return new HBaseClient(new MockHBaseTableProvider(), HBaseConfiguration.create(),
+ return new LegacyHBaseClient(new MockHBaseTableProvider(), HBaseConfiguration.create(),
"enrichment_list");
}
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java
index 3d07da5..4176121 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java
@@ -42,7 +42,7 @@ import org.apache.metron.common.configuration.enrichment.EnrichmentConfig;
import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
import org.apache.metron.common.configuration.enrichment.threatintel.ThreatIntelConfig;
import org.apache.metron.common.zookeeper.ConfigurationsCache;
-import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.hbase.client.LegacyHBaseClient;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.service.SensorEnrichmentConfigService;
import org.apache.zookeeper.KeeperException;
@@ -82,14 +82,14 @@ public class SensorEnrichmentConfigServiceImplTest {
public static String broJson;
ConfigurationsCache cache;
- private HBaseClient hBaseClient;
+ private LegacyHBaseClient hBaseClient;
@Before
public void setUp() throws Exception {
objectMapper = mock(ObjectMapper.class);
curatorFramework = mock(CuratorFramework.class);
cache = mock(ConfigurationsCache.class);
- hBaseClient = mock(HBaseClient.class);
+ hBaseClient = mock(LegacyHBaseClient.class);
sensorEnrichmentConfigService = new SensorEnrichmentConfigServiceImpl(objectMapper, curatorFramework, cache, hBaseClient);
}
diff --git a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/pom.xml b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/pom.xml
index 190bcb2..ce61df3 100644
--- a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/pom.xml
+++ b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/pom.xml
@@ -59,9 +59,23 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
+ <exclusion>
+ <artifactId>commons-httpclient</artifactId>
+ <groupId>commons-httpclient</groupId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.4.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ <version>4.4.9</version>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava_version}</version>
@@ -178,6 +192,10 @@
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
+ <exclusion>
+ <artifactId>commons-httpclient</artifactId>
+ <groupId>commons-httpclient</groupId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
diff --git a/metron-platform/metron-hbase-server/src/main/java/org/apache/metron/hbase/coprocessor/HBaseCacheWriter.java b/metron-platform/metron-hbase-server/src/main/java/org/apache/metron/hbase/coprocessor/HBaseCacheWriter.java
index b1bbdde..5d19960 100644
--- a/metron-platform/metron-hbase-server/src/main/java/org/apache/metron/hbase/coprocessor/HBaseCacheWriter.java
+++ b/metron-platform/metron-hbase-server/src/main/java/org/apache/metron/hbase/coprocessor/HBaseCacheWriter.java
@@ -26,7 +26,7 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.metron.hbase.TableProvider;
-import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.hbase.client.LegacyHBaseClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,7 +59,7 @@ public class HBaseCacheWriter implements CacheWriter<String, String> {
@Override
public void write(@Nonnull String key, @Nonnull String value) {
LOG.debug("Calling hbase cache writer with key='{}', value='{}'", key, value);
- try (HBaseClient hbClient = new HBaseClient(this.tableProvider, this.config, this.tableName)) {
+ try (LegacyHBaseClient hbClient = new LegacyHBaseClient(this.tableProvider, this.config, this.tableName)) {
LOG.debug("rowKey={}, columnFamily={}, columnQualifier={}, value={}", key, columnFamily,
columnQualifier, value);
hbClient.put(key, columnFamily, columnQualifier, value);
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/ColumnList.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/ColumnList.java
index 01a5cc6..33af434 100644
--- a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/ColumnList.java
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/ColumnList.java
@@ -20,8 +20,12 @@
package org.apache.metron.hbase;
+import org.apache.hadoop.hbase.util.Bytes;
+
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
/**
* Represents a list of HBase columns.
@@ -76,6 +80,30 @@ public class ColumnList {
public long getTs() {
return ts;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof Column)) return false;
+ Column column = (Column) o;
+ return ts == column.ts &&
+ Arrays.equals(value, column.value);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(ts);
+ result = 31 * result + Arrays.hashCode(value);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "Column{" +
+ "value=" + Arrays.toString(value) +
+ ", ts=" + ts +
+ '}';
+ }
}
public static class Counter extends AbstractColumn {
@@ -88,6 +116,26 @@ public class ColumnList {
public long getIncrement() {
return incr;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof Counter)) return false;
+ Counter counter = (Counter) o;
+ return incr == counter.incr;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(incr);
+ }
+
+ @Override
+ public String toString() {
+ return "Counter{" +
+ "incr=" + incr +
+ '}';
+ }
}
private ArrayList<ColumnList.Column> columns;
@@ -123,6 +171,24 @@ public class ColumnList {
return this;
}
+ public ColumnList addColumn(byte[] family, byte[] qualifier){
+ addColumn(new Column(family, qualifier, -1, null));
+ return this;
+ }
+
+ public ColumnList addColumn(String family, String qualifier){
+ addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
+ return this;
+ }
+
+ public ColumnList addColumn(String family, String qualifier, byte[] value){
+ return addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), value);
+ }
+
+ public ColumnList addColumn(String family, String qualifier, String value){
+ return addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value));
+ }
+
/**
* Add a standard HBase column given an instance of a class that implements
* the <code>IColumn</code> interface.
@@ -131,6 +197,11 @@ public class ColumnList {
return this.addColumn(column.family(), column.qualifier(), column.timestamp(), column.value());
}
+ public ColumnList addColumn(Column column){
+ columns().add(column);
+ return this;
+ }
+
/**
* Add an HBase counter column.
*/
@@ -152,14 +223,14 @@ public class ColumnList {
* Query to determine if we have column definitions.
*/
public boolean hasColumns(){
- return this.columns != null;
+ return columns != null && columns.size() > 0;
}
/**
* Query to determine if we have counter definitions.
*/
public boolean hasCounters(){
- return this.counters != null;
+ return this.counters != null && counters.size() > 0;
}
/**
@@ -175,4 +246,26 @@ public class ColumnList {
public List<Counter> getCounters(){
return this.counters;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof ColumnList)) return false;
+ ColumnList that = (ColumnList) o;
+ return Objects.equals(columns, that.columns) &&
+ Objects.equals(counters, that.counters);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(columns, counters);
+ }
+
+ @Override
+ public String toString() {
+ return "ColumnList{" +
+ "columns=" + columns +
+ ", counters=" + counters +
+ '}';
+ }
}
\ No newline at end of file
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/HBaseProjectionCriteria.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/HBaseProjectionCriteria.java
index 7432b02..6bacfb2 100644
--- a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/HBaseProjectionCriteria.java
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/HBaseProjectionCriteria.java
@@ -40,12 +40,16 @@ public class HBaseProjectionCriteria implements Serializable {
public static class ColumnMetaData implements Serializable {
- private byte[] columnFamily;
+ private byte[] columnFamily;
private byte[] qualifier;
public ColumnMetaData(String columnFamily, String qualifier) {
- this.columnFamily = columnFamily.getBytes();
- this.qualifier = qualifier.getBytes();
+ this(columnFamily.getBytes(), qualifier.getBytes());
+ }
+
+ public ColumnMetaData(byte[] columnFamily, byte[] qualifier) {
+ this.columnFamily = columnFamily;
+ this.qualifier = qualifier;
}
public byte[] getColumnFamily() {
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClient.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClient.java
index d0d934e..f51927a 100644
--- a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClient.java
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClient.java
@@ -17,321 +17,114 @@
* limitations under the License.
*
*/
-
package org.apache.metron.hbase.client;
-import static org.apache.commons.collections4.CollectionUtils.size;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.metron.hbase.TableProvider;
import org.apache.metron.hbase.ColumnList;
import org.apache.metron.hbase.HBaseProjectionCriteria;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
/**
* A client that interacts with HBase.
*/
-public class HBaseClient implements Closeable {
-
- private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+public interface HBaseClient extends Closeable {
/**
- * The batch of queued Mutations.
+ * Enqueues a 'get' request that will be submitted when {@link #getAll()} is called.
+ * @param rowKey The row key to be retrieved.
*/
- List<Mutation> mutations;
+ void addGet(byte[] rowKey, HBaseProjectionCriteria criteria);
/**
- * The batch of queued Gets.
+ * Submits all pending get operations and returns the result of each.
+ * @return The result of each pending get request.
*/
- List<Get> gets;
-
- /**
- * The HBase table this client interacts with.
- */
- private HTableInterface table;
-
- public HBaseClient(TableProvider provider, final Configuration configuration, final String tableName) {
- this.mutations = new ArrayList<>();
- this.gets = new ArrayList<>();
- try {
- this.table = provider.getTable(configuration, tableName);
- } catch (Exception e) {
- String msg = String.format("Unable to open connection to HBase for table '%s'", tableName);
- LOG.error(msg, e);
- throw new RuntimeException(msg, e);
- }
- }
+ Result[] getAll();
/**
- * Add a Mutation such as a Put or Increment to the batch. The Mutation is only queued for
- * later execution.
- *
- * @param rowKey The row key of the Mutation.
- * @param cols The columns affected by the Mutation.
- * @param durability The durability of the mutation.
+ * Clears all pending get operations.
*/
- public void addMutation(byte[] rowKey, ColumnList cols, Durability durability) {
-
- if (cols.hasColumns()) {
- Put put = createPut(rowKey, cols, durability);
- mutations.add(put);
- }
-
- if (cols.hasCounters()) {
- Increment inc = createIncrement(rowKey, cols, durability);
- mutations.add(inc);
- }
-
- if (mutations.isEmpty()) {
- mutations.add(new Put(rowKey));
- }
- }
+ void clearGets();
/**
- * Adds a Mutation such as a Put or Increment with a time to live. The Mutation is only queued
- * for later execution.
+ * Scans an entire table returning all row keys as a List of Strings.
*
- * @param rowKey The row key of the Mutation.
- * @param cols The columns affected by the Mutation.
- * @param durability The durability of the mutation.
- * @param timeToLiveMillis The time to live in milliseconds.
- */
- public void addMutation(byte[] rowKey, ColumnList cols, Durability durability, Long timeToLiveMillis) {
-
- if (cols.hasColumns()) {
- Put put = createPut(rowKey, cols, durability, timeToLiveMillis);
- mutations.add(put);
- }
-
- if (cols.hasCounters()) {
- Increment inc = createIncrement(rowKey, cols, durability, timeToLiveMillis);
- mutations.add(inc);
- }
-
- if (mutations.isEmpty()) {
- Put put = new Put(rowKey);
- put.setTTL(timeToLiveMillis);
- mutations.add(put);
- }
- }
-
- /**
- * Remove all queued Mutations from the batch.
- */
- public void clearMutations() {
- mutations.clear();
- }
-
- /**
- * Submits all queued Mutations.
- * @return The number of mutation submitted.
- */
- public int mutate() {
- int mutationCount = mutations.size();
- Object[] result = new Object[mutationCount];
- try {
- table.batch(mutations, result);
- mutations.clear();
-
- } catch (Exception e) {
- String msg = String.format("'%d' HBase write(s) failed on table '%s'", size(mutations), tableName(table));
- LOG.error(msg, e);
- throw new RuntimeException(msg, e);
- }
-
- return mutationCount;
- }
-
- /**
- * Adds a Get to the batch.
+ * <p><b>**WARNING**:</b> Do not use this method unless you're absolutely crystal clear about the performance
+ * impact. Doing full table scans in HBase can adversely impact performance.
*
- * @param rowKey The row key of the Get
- * @param criteria Defines the columns/families that will be retrieved.
- */
- public void addGet(byte[] rowKey, HBaseProjectionCriteria criteria) {
- Get get = new Get(rowKey);
-
- if (criteria != null) {
- criteria.getColumnFamilies().forEach(cf -> get.addFamily(cf));
- criteria.getColumns().forEach(col -> get.addColumn(col.getColumnFamily(), col.getQualifier()));
- }
-
- // queue the get
- this.gets.add(get);
- }
-
- /**
- * Clears all queued Gets from the batch.
+ * @return List of all row keys as Strings for this table.
*/
- public void clearGets() {
- gets.clear();
- }
+ List<String> scanRowKeys() throws IOException;
/**
- * Submit all queued Gets.
+ * Scans the table and returns each result.
*
- * @return The Result of each queued Get.
- */
- public Result[] getAll() {
- try {
- Result[] results = table.get(gets);
- gets.clear();
- return results;
-
- } catch (Exception e) {
- String msg = String.format("'%d' HBase read(s) failed on table '%s'", size(gets), tableName(table));
- LOG.error(msg, e);
- throw new RuntimeException(msg, e);
- }
- }
-
- /**
- * Close the table.
- */
- @Override
- public void close() throws IOException {
- if(table != null) {
- table.close();
- }
- }
-
- /**
- * Creates an HBase Put.
+ * <p><b>**WARNING**:</b> Do not use this method unless you're absolutely crystal clear about the performance
+ * impact. Doing full table scans in HBase can adversely impact performance.
*
- * @param rowKey The row key.
- * @param cols The columns to put.
- * @param durability The durability of the put.
+ * @return The results from the scan.
+ * @throws IOException
*/
- private Put createPut(byte[] rowKey, ColumnList cols, Durability durability) {
- Put put = new Put(rowKey);
- put.setDurability(durability);
- addColumns(cols, put);
- return put;
- }
+ Result[] scan(int numRows) throws IOException;
/**
- * Creates an HBase Put.
+ * Enqueues a {@link org.apache.hadoop.hbase.client.Mutation} such as a put or
+ * increment. The operation is enqueued for later execution.
*
- * @param rowKey The row key.
- * @param cols The columns to put.
- * @param durability The durability of the put.
- * @param timeToLiveMillis The TTL in milliseconds.
+ * @param rowKey The row key of the Mutation.
+ * @param cols The columns affected by the Mutation.
*/
- private Put createPut(byte[] rowKey, ColumnList cols, Durability durability, long timeToLiveMillis) {
- Put put = new Put(rowKey);
- put.setDurability(durability);
- put.setTTL(timeToLiveMillis);
- addColumns(cols, put);
- return put;
- }
+ void addMutation(byte[] rowKey, ColumnList cols);
/**
- * Adds the columns to the Put
+ * Enqueues a {@link org.apache.hadoop.hbase.client.Mutation} such as a put or
+ * increment. The operation is enqueued for later execution.
*
- * @param cols The columns to add.
- * @param put The Put.
+ * @param rowKey The row key of the Mutation.
+ * @param cols The columns affected by the Mutation.
+ * @param durability The durability of the mutation.
*/
- private void addColumns(ColumnList cols, Put put) {
- for (ColumnList.Column col : cols.getColumns()) {
-
- if (col.getTs() > 0) {
- put.add(col.getFamily(), col.getQualifier(), col.getTs(), col.getValue());
-
- } else {
- put.add(col.getFamily(), col.getQualifier(), col.getValue());
- }
- }
- }
+ void addMutation(byte[] rowKey, ColumnList cols, Durability durability);
/**
- * Creates an HBase Increment for a counter.
+ * Enqueues a {@link org.apache.hadoop.hbase.client.Mutation} such as a put or
+ * increment. The operation is enqueued for later execution.
*
- * @param rowKey The row key.
- * @param cols The columns to include.
- * @param durability The durability of the increment.
+ * @param rowKey The row key of the Mutation.
+ * @param cols The columns affected by the Mutation.
+ * @param durability The durability of the mutation.
+ * @param timeToLiveMillis The time to live in milliseconds.
*/
- private Increment createIncrement(byte[] rowKey, ColumnList cols, Durability durability) {
- Increment inc = new Increment(rowKey);
- inc.setDurability(durability);
- cols.getCounters().forEach(cnt -> inc.addColumn(cnt.getFamily(), cnt.getQualifier(), cnt.getIncrement()));
- return inc;
- }
+ void addMutation(byte[] rowKey, ColumnList cols, Durability durability, Long timeToLiveMillis);
/**
- * Creates an HBase Increment for a counter.
+ * Ensures that all pending mutations have completed.
*
- * @param rowKey The row key.
- * @param cols The columns to include.
- * @param durability The durability of the increment.
+ * @return The number of operations completed.
*/
- private Increment createIncrement(byte[] rowKey, ColumnList cols, Durability durability, long timeToLiveMillis) {
- Increment inc = new Increment(rowKey);
- inc.setDurability(durability);
- inc.setTTL(timeToLiveMillis);
- cols.getCounters().forEach(cnt -> inc.addColumn(cnt.getFamily(), cnt.getQualifier(), cnt.getIncrement()));
- return inc;
- }
+ int mutate();
/**
- * Returns the name of the HBase table.
- * <p>Attempts to avoid any null pointers that might be encountered along the way.
- * @param table The table to retrieve the name of.
- * @return The name of the table
+ * Clears all pending mutations.
*/
- private static String tableName(HTableInterface table) {
- String tableName = "null";
- if(table != null) {
- if(table.getName() != null) {
- tableName = table.getName().getNameAsString();
- }
- }
- return tableName;
- }
+ void clearMutations();
/**
- * Puts a record into the configured HBase table synchronously (not batched).
+ * Delete a record by row key.
+ *
+ * @param rowKey The row key to delete.
*/
- public void put(String rowKey, String columnFamily, String columnQualifier, String value)
- throws IOException {
- Put put = new Put(Bytes.toBytes(rowKey));
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnQualifier),
- Bytes.toBytes(value));
- table.put(put);
- }
+ void delete(byte[] rowKey);
/**
- * Scans an entire table returning all row keys as a List of Strings.
+ * Delete a column or set of columns by row key.
*
- * <p>
- * <b>**WARNING**:</b> Do not use this method unless you're absolutely crystal clear about the performance
- * impact. Doing full table scans in HBase can adversely impact performance.
- *
- * @return List of all row keys as Strings for this table.
+ * @param rowKey The row key to delete.
+ * @param columnList The set of columns to delete.
*/
- public List<String> readRecords() throws IOException {
- Scan scan = new Scan();
- ResultScanner scanner = table.getScanner(scan);
- List<String> rows = new ArrayList<>();
- for (Result r = scanner.next(); r != null; r = scanner.next()) {
- rows.add(Bytes.toString(r.getRow()));
- }
- return rows;
- }
-
+ void delete(byte[] rowKey, ColumnList columnList);
}
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClientFactory.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClientFactory.java
new file mode 100644
index 0000000..b38b07b
--- /dev/null
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClientFactory.java
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.InvocationTargetException;
+import java.util.function.Supplier;
+
+/**
+ * Responsible for creating an {@link HBaseTableClient}.
+ */
+public interface HBaseClientFactory extends Serializable {
+ Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ /**
+ * @param factory The connection factory for creating connections to HBase.
+ * @param configuration The HBase configuration.
+ * @param tableName The name of the HBase table.
+ * @return An {@link HBaseTableClient}.
+ */
+ HBaseClient create(HBaseConnectionFactory factory, Configuration configuration, String tableName);
+
+ /**
+ * Instantiates a new {@link HBaseClientFactory} by class name.
+ *
+ * @param className The class name of the {@link HBaseClientFactory} to instantiate.
+ * @param defaultImpl The default instance to instantiate if the className is invalid.
+ * @return A new {@link HBaseClientFactory}.
+ */
+ static HBaseClientFactory byName(String className, Supplier<HBaseClientFactory> defaultImpl) {
+ LOG.debug("Creating HBase client creator; className={}", className);
+
+ if(className == null || className.length() == 0 || className.charAt(0) == '$') {
+ LOG.debug("Using default hbase client creator");
+ return defaultImpl.get();
+
+ } else {
+ try {
+ Class<? extends HBaseClientFactory> clazz = (Class<? extends HBaseClientFactory>) Class.forName(className);
+ return clazz.getConstructor().newInstance();
+
+ } catch(InstantiationException | IllegalAccessException | InvocationTargetException |
+ NoSuchMethodException | ClassNotFoundException e) {
+ throw new IllegalStateException("Unable to instantiate connector.", e);
+ }
+ }
+ }
+}
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseConnectionFactory.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseConnectionFactory.java
new file mode 100644
index 0000000..bda26c5
--- /dev/null
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseConnectionFactory.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * Establishes a {@link Connection} to HBase.
+ */
+public class HBaseConnectionFactory implements Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public Connection createConnection(Configuration configuration) throws IOException {
+ return ConnectionFactory.createConnection(configuration);
+ }
+
+ /**
+ * Creates an {@link HBaseConnectionFactory} based on a fully-qualified class name.
+ *
+ * @param className The fully-qualified class name to instantiate.
+ * @return A {@link HBaseConnectionFactory}.
+ */
+ public static HBaseConnectionFactory byName(String className) {
+ LOG.debug("Creating HBase connection factory; className={}", className);
+ try {
+ Class<? extends HBaseConnectionFactory> clazz = (Class<? extends HBaseConnectionFactory>) Class.forName(className);
+ return clazz.getConstructor().newInstance();
+
+ } catch (InstantiationException | NoSuchMethodException | IllegalAccessException | ClassNotFoundException | InvocationTargetException e) {
+ throw new IllegalStateException("Unable to instantiate HBaseConnectionFactory.", e);
+ }
+ }
+}
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseTableClient.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseTableClient.java
new file mode 100644
index 0000000..60d2328
--- /dev/null
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseTableClient.java
@@ -0,0 +1,285 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.hbase.ColumnList;
+import org.apache.metron.hbase.HBaseProjectionCriteria;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.commons.collections4.CollectionUtils.size;
+
+/**
+ * An {@link HBaseClient} that uses the {@link Table} API to interact with HBase.
+ */
+public class HBaseTableClient implements HBaseClient {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private List<Mutation> mutations;
+ private List<Get> gets;
+ private Connection connection;
+ private Table table;
+
+ /**
+ * @param connectionFactory Creates connections to HBase.
+ * @param configuration The HBase configuration.
+ * @param tableName The name of the HBase table.
+ */
+ public HBaseTableClient(HBaseConnectionFactory connectionFactory, Configuration configuration, String tableName) throws IOException {
+ gets = new ArrayList<>();
+ mutations = new ArrayList<>();
+ connection = connectionFactory.createConnection(configuration);
+ table = connection.getTable(TableName.valueOf(tableName));
+ }
+
+ @Override
+ public void close() {
+ try {
+ if(table != null) {
+ table.close();
+ }
+ } catch(IOException e) {
+ LOG.error("Error while closing HBase table", e);
+ }
+
+ try {
+ if(connection != null) {
+ connection.close();
+ }
+ } catch(IOException e) {
+ LOG.error("Error while closing HBase connection",e);
+ }
+ }
+
+ @Override
+ public void addGet(byte[] rowKey, HBaseProjectionCriteria criteria) {
+ Get get = new Get(rowKey);
+
+ // define which column families and columns are needed
+ if (criteria != null) {
+ criteria.getColumnFamilies().forEach(cf -> get.addFamily(cf));
+ criteria.getColumns().forEach(col -> get.addColumn(col.getColumnFamily(), col.getQualifier()));
+ }
+
+ // queue the get
+ this.gets.add(get);
+ }
+
+ @Override
+ public Result[] getAll() {
+ try {
+ return table.get(gets);
+
+ } catch (Exception e) {
+ String msg = String.format("'%d' HBase read(s) failed on table '%s'", size(gets), tableName(table));
+ LOG.error(msg, e);
+ throw new RuntimeException(msg, e);
+
+ } finally {
+ gets.clear();
+ }
+ }
+
+ @Override
+ public void clearGets() {
+ gets.clear();
+ }
+
+ @Override
+ public List<String> scanRowKeys() throws IOException {
+ List<String> rowKeys = new ArrayList<>();
+ ResultScanner scanner = getScanner();
+ for (Result r = scanner.next(); r != null; r = scanner.next()) {
+ String rowKeyAsString = Bytes.toString(r.getRow());
+ rowKeys.add(rowKeyAsString);
+ }
+ return rowKeys;
+ }
+
+ @Override
+ public Result[] scan(int numRows) throws IOException {
+ return getScanner().next(numRows);
+ }
+
+ private ResultScanner getScanner() throws IOException {
+ Scan scan = new Scan();
+ return table.getScanner(scan);
+ }
+
+ /**
+ * Returns the name of the HBase table.
+ * <p>Attempts to avoid any null pointers that might be encountered along the way.
+ * @param table The table to retrieve the name of.
+ * @return The name of the table
+ */
+ private static String tableName(Table table) {
+ String tableName = "null";
+ if(table != null) {
+ if(table.getName() != null) {
+ tableName = table.getName().getNameAsString();
+ }
+ }
+ return tableName;
+ }
+
+ @Override
+ public void addMutation(byte[] rowKey, ColumnList cols) {
+ HBaseWriterParams params = new HBaseWriterParams();
+ addMutation(rowKey, cols, params);
+ }
+
+ @Override
+ public void addMutation(byte[] rowKey, ColumnList cols, Durability durability) {
+ HBaseWriterParams params = new HBaseWriterParams()
+ .withDurability(durability);
+ addMutation(rowKey, cols, params);
+ }
+
+ @Override
+ public void addMutation(byte[] rowKey, ColumnList cols, Durability durability, Long timeToLiveMillis) {
+ HBaseWriterParams params = new HBaseWriterParams()
+ .withDurability(durability)
+ .withTimeToLive(timeToLiveMillis);
+ addMutation(rowKey, cols, params);
+ }
+
+ private void addMutation(byte[] rowKey, ColumnList cols, HBaseWriterParams params) {
+ if (cols.hasColumns()) {
+ Put put = createPut(rowKey, params);
+ addColumns(cols, put);
+ mutations.add(put);
+ }
+ if (cols.hasCounters()) {
+ Increment inc = createIncrement(rowKey, params);
+ addColumns(cols, inc);
+ mutations.add(inc);
+ }
+ }
+
+ @Override
+ public void clearMutations() {
+ mutations.clear();
+ }
+
+ @Override
+ public int mutate() {
+ int mutationCount = mutations.size();
+ if(mutationCount > 0) {
+ doMutate();
+ }
+
+ return mutationCount;
+ }
+
+ @Override
+ public void delete(byte[] rowKey) {
+ try {
+ Delete delete = new Delete(rowKey);
+ table.delete(delete);
+
+ } catch (Exception e) {
+ String msg = String.format("Unable to delete; table=%s", tableName(table));
+ LOG.error(msg, e);
+ throw new RuntimeException(msg, e);
+ }
+ }
+
+ @Override
+ public void delete(byte[] rowKey, ColumnList columnList) {
+ try {
+ Delete delete = new Delete(rowKey);
+ for(ColumnList.Column column: columnList.getColumns()) {
+ delete.addColumn(column.getFamily(), column.getQualifier());
+ }
+ table.delete(delete);
+
+ } catch (Exception e) {
+ String msg = String.format("Unable to delete; table=%s", tableName(table));
+ LOG.error(msg, e);
+ throw new RuntimeException(msg, e);
+ }
+ }
+
+ private void doMutate() {
+ Object[] result = new Object[mutations.size()];
+ try {
+ table.batch(mutations, result);
+
+ } catch (Exception e) {
+ String msg = String.format("'%d' HBase write(s) failed on table '%s'", size(mutations), tableName(table));
+ LOG.error(msg, e);
+ throw new RuntimeException(msg, e);
+
+ } finally {
+ mutations.clear();
+ }
+ }
+
+ private Put createPut(byte[] rowKey, HBaseWriterParams params) {
+ Put put = new Put(rowKey);
+ if(params.getTimeToLiveMillis() > 0) {
+ put.setTTL(params.getTimeToLiveMillis());
+ }
+ put.setDurability(params.getDurability());
+ return put;
+ }
+
+ private void addColumns(ColumnList cols, Put put) {
+ for (ColumnList.Column col: cols.getColumns()) {
+ if (col.getTs() > 0) {
+ put.addColumn(col.getFamily(), col.getQualifier(), col.getTs(), col.getValue());
+ } else {
+ put.addColumn(col.getFamily(), col.getQualifier(), col.getValue());
+ }
+ }
+ }
+
+ private void addColumns(ColumnList cols, Increment inc) {
+ cols.getCounters().forEach(cnt ->
+ inc.addColumn(cnt.getFamily(), cnt.getQualifier(), cnt.getIncrement()));
+ }
+
+ private Increment createIncrement(byte[] rowKey, HBaseWriterParams params) {
+ Increment inc = new Increment(rowKey);
+ if(params.getTimeToLiveMillis() > 0) {
+ inc.setTTL(params.getTimeToLiveMillis());
+ }
+ inc.setDurability(params.getDurability());
+ return inc;
+ }
+}
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseTableClientFactory.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseTableClientFactory.java
new file mode 100644
index 0000000..8697e72
--- /dev/null
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseTableClientFactory.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+
+/**
+ * Creates an {@link HBaseTableClient}.
+ */
+public class HBaseTableClientFactory implements HBaseClientFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ /**
+ * @param factory The factory that creates connections to HBase.
+ * @param configuration The HBase configuration.
+ * @param tableName The name of the HBase table.
+ * @return An {@link HBaseTableClient} that behaves synchronously.
+ */
+ @Override
+ public HBaseClient create(HBaseConnectionFactory factory,
+ Configuration configuration,
+ String tableName) {
+ try {
+ LOG.debug("Creating HBase client; table={}", tableName);
+ return new HBaseTableClient(factory, configuration, tableName);
+
+ } catch (Exception e) {
+ String msg = String.format("Unable to open connection to HBase for table '%s'", tableName);
+ LOG.error(msg, e);
+ throw new RuntimeException(msg, e);
+ }
+ }
+}
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseWriterParams.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseWriterParams.java
new file mode 100644
index 0000000..ec93177
--- /dev/null
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseWriterParams.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.hbase.client;
+
+import org.apache.hadoop.hbase.client.Durability;
+
+/**
+ * Parameters that define how the {@link HBaseWriter} writes to HBase.
+ */
+public class HBaseWriterParams {
+ private Durability durability;
+ private Long timeToLiveMillis;
+
+ public HBaseWriterParams() {
+ durability = Durability.USE_DEFAULT;
+ timeToLiveMillis = 0L;
+ }
+
+ public HBaseWriterParams withDurability(Durability durability) {
+ this.durability = durability;
+ return this;
+ }
+
+ public HBaseWriterParams withTimeToLive(Long timeToLiveMillis) {
+ this.timeToLiveMillis = timeToLiveMillis;
+ return this;
+ }
+
+ public Durability getDurability() {
+ return durability;
+ }
+
+ public Long getTimeToLiveMillis() {
+ return timeToLiveMillis;
+ }
+}
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClient.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/LegacyHBaseClient.java
similarity index 98%
copy from metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClient.java
copy to metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/LegacyHBaseClient.java
index d0d934e..d3f22c0 100644
--- a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClient.java
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/LegacyHBaseClient.java
@@ -47,7 +47,7 @@ import org.slf4j.LoggerFactory;
/**
* A client that interacts with HBase.
*/
-public class HBaseClient implements Closeable {
+public class LegacyHBaseClient implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -66,7 +66,7 @@ public class HBaseClient implements Closeable {
*/
private HTableInterface table;
- public HBaseClient(TableProvider provider, final Configuration configuration, final String tableName) {
+ public LegacyHBaseClient(TableProvider provider, final Configuration configuration, final String tableName) {
this.mutations = new ArrayList<>();
this.gets = new ArrayList<>();
try {
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
index 1983fc7..f9a9f1f 100644
--- a/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
@@ -63,7 +63,7 @@ public class HBaseClientTest {
private static final String tableName = "table";
private static HBaseTestingUtility util;
- private static HBaseClient client;
+ private static LegacyHBaseClient client;
private static HTableInterface table;
private static Admin admin;
private static byte[] cf = Bytes.toBytes("cf");
@@ -87,7 +87,7 @@ public class HBaseClientTest {
table = util.createTable(Bytes.toBytes(tableName), cf);
util.waitTableEnabled(table.getName());
// setup the client
- client = new HBaseClient((c,t) -> table, table.getConfiguration(), tableName);
+ client = new LegacyHBaseClient((c, t) -> table, table.getConfiguration(), tableName);
}
@AfterClass
@@ -259,7 +259,7 @@ public class HBaseClientTest {
TableProvider tableProvider = mock(TableProvider.class);
when(tableProvider.getTable(any(), any())).thenThrow(new IllegalArgumentException("test exception"));
- client = new HBaseClient(tableProvider, HBaseConfiguration.create(), tableName);
+ client = new LegacyHBaseClient(tableProvider, HBaseConfiguration.create(), tableName);
}
@Test(expected = RuntimeException.class)
@@ -271,7 +271,7 @@ public class HBaseClientTest {
TableProvider tableProvider = mock(TableProvider.class);
when(tableProvider.getTable(any(), any())).thenReturn(table);
- client = new HBaseClient(tableProvider, HBaseConfiguration.create(), tableName);
+ client = new LegacyHBaseClient(tableProvider, HBaseConfiguration.create(), tableName);
client.addMutation(rowKey1, cols1, Durability.SYNC_WAL);
client.mutate();
}
@@ -288,7 +288,7 @@ public class HBaseClientTest {
HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
criteria.addColumnFamily(Bytes.toString(cf));
- client = new HBaseClient(tableProvider, HBaseConfiguration.create(), tableName);
+ client = new LegacyHBaseClient(tableProvider, HBaseConfiguration.create(), tableName);
client.addGet(rowKey1, criteria);
client.addGet(rowKey2, criteria);
client.getAll();
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/integration/HBaseTableClientIntegrationTest.java b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/integration/HBaseTableClientIntegrationTest.java
new file mode 100644
index 0000000..d9db3cf
--- /dev/null
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/integration/HBaseTableClientIntegrationTest.java
@@ -0,0 +1,286 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.hbase.client.integration;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.hbase.ColumnList;
+import org.apache.metron.hbase.HBaseProjectionCriteria;
+import org.apache.metron.hbase.client.HBaseConnectionFactory;
+import org.apache.metron.hbase.client.HBaseTableClient;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * An integration test for the {@link HBaseTableClient}.
+ */
+public class HBaseTableClientIntegrationTest {
+ private static final String tableName = "widgets";
+ private static final String columnFamily = "W";
+ private static final byte[] columnFamilyB = Bytes.toBytes(columnFamily);
+ private static final String columnQualifier = "column";
+ private static final byte[] columnQualifierB = Bytes.toBytes(columnQualifier);
+ private static final String rowKey1String = "row-key-1";
+ private static final byte[] rowKey1 = Bytes.toBytes(rowKey1String);
+ private static final String rowKey2String = "row-key-2";
+ private static final byte[] rowKey2 = Bytes.toBytes(rowKey2String);
+ private static HBaseTestingUtility util;
+ private static Table table;
+ private HBaseTableClient client;
+
+ @BeforeClass
+ public static void startHBase() throws Exception {
+ Configuration config = HBaseConfiguration.create();
+ config.set("hbase.master.hostname", "localhost");
+ config.set("hbase.regionserver.hostname", "localhost");
+
+ util = new HBaseTestingUtility(config);
+ util.startMiniCluster();
+
+ // create the table
+ table = util.createTable(TableName.valueOf(tableName), columnFamily);
+ util.waitTableEnabled(table.getName());
+ }
+
+ @AfterClass
+ public static void stopHBase() throws Exception {
+ util.deleteTable(table.getName());
+ util.shutdownMiniCluster();
+ util.cleanupTestDir();
+ }
+
+ @Before
+ public void setup() throws IOException {
+ client = new HBaseTableClient(new HBaseConnectionFactory(), util.getConfiguration(), tableName);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ // delete all records in the table
+ List<Delete> deletions = new ArrayList<>();
+ for(Result r : table.getScanner(new Scan())) {
+ deletions.add(new Delete(r.getRow()));
+ }
+ table.delete(deletions);
+
+ if(client != null) {
+ client.close();
+ }
+ }
+
+ @Test
+ public void testMutate() throws Exception {
+ // write some values
+ ColumnList columns = new ColumnList()
+ .addColumn(columnFamily, columnQualifier, "value1");
+ client.addMutation(rowKey1, columns, Durability.SKIP_WAL);
+ client.mutate();
+
+ // read back the value
+ client.addGet(rowKey1, new HBaseProjectionCriteria().addColumnFamily(columnFamily));
+ Result[] results = client.getAll();
+
+ // validate
+ assertEquals(1, results.length);
+ assertEquals("value1", getValue(results[0], columnFamily, columnQualifier));
+ }
+
+ @Test
+ public void testMutateMultipleColumns() throws Exception {
+ // write some values
+ ColumnList columns = new ColumnList()
+ .addColumn(columnFamily, "col1", "value1")
+ .addColumn(columnFamily, "col2", "value2");
+ client.addMutation(rowKey1, columns, Durability.SKIP_WAL);
+ client.mutate();
+
+ // read back the value
+ client.addGet(rowKey1, new HBaseProjectionCriteria().addColumnFamily(columnFamily));
+ Result[] results = client.getAll();
+
+ // validate
+ assertEquals(1, results.length);
+ assertEquals("value1", getValue(results[0], columnFamily, "col1"));
+ assertEquals("value2", getValue(results[0], columnFamily, "col2"));
+ }
+
+ @Test
+ public void testNoMutations() throws Exception {
+ // do not add any mutations before attempting to write
+ int count = client.mutate();
+ Assert.assertEquals(0, count);
+
+ // attempt to read
+ HBaseProjectionCriteria criteria = new HBaseProjectionCriteria().addColumnFamily(columnFamily);
+ client.addGet(rowKey1, criteria);
+ client.addGet(rowKey2, criteria);
+ Result[] results = client.getAll();
+
+ // nothing should have been read
+ assertEquals(2, results.length);
+ for(Result result : results) {
+ Assert.assertTrue(result.isEmpty());
+ }
+ }
+
+ @Test
+ public void testScan() throws Exception {
+ // write some values
+ client.addMutation(rowKey1, new ColumnList().addColumn(columnFamily, columnQualifier, "value1"), Durability.SKIP_WAL);
+ client.mutate();
+
+ // scan the table
+ Result[] results = client.scan(10);
+ assertEquals(1, results.length);
+
+ assertArrayEquals(rowKey1, results[0].getRow());
+ String actual1 = Bytes.toString(results[0].getValue(columnFamilyB, columnQualifierB));
+ assertEquals("value1", actual1);
+ }
+
+ @Test
+ public void testScanLimit() throws Exception {
+ // write some values
+ client.addMutation(rowKey1, new ColumnList().addColumn(columnFamily, columnQualifier, "value1"), Durability.SKIP_WAL);
+ client.addMutation(rowKey2, new ColumnList().addColumn(columnFamily, columnQualifier, "value2"), Durability.SKIP_WAL);
+ client.mutate();
+
+ // scan the table, but limit to 1 result
+ Result[] results = client.scan(1);
+ assertEquals(1, results.length);
+ }
+
+ @Test
+ public void testScanNothing() throws Exception {
+ // scan the table, but there is nothing there
+ Result[] results = client.scan(1);
+ assertEquals(0, results.length);
+ }
+
+ @Test
+ public void testScanRowKeys() throws Exception {
+ // write some values
+ client.addMutation(rowKey1, new ColumnList().addColumn(columnFamily, columnQualifier, "value1"), Durability.SKIP_WAL);
+ client.addMutation(rowKey2, new ColumnList().addColumn(columnFamily, columnQualifier, "value2"), Durability.SKIP_WAL);
+ client.mutate();
+
+ // scan the table
+ List<String> rowKeys = client.scanRowKeys();
+ List<String> expected = Arrays.asList(rowKey1String, rowKey2String);
+ assertEquals(new HashSet<>(expected), new HashSet<>(rowKeys));
+ }
+
+ @Test
+ public void testDelete() {
+ // write some values
+ client.addMutation(rowKey1, new ColumnList().addColumn(columnFamily, columnQualifier, "value1"), Durability.SKIP_WAL);
+ client.addMutation(rowKey2, new ColumnList().addColumn(columnFamily, columnQualifier, "value2"), Durability.SKIP_WAL);
+ client.mutate();
+
+ client.delete(rowKey1);
+
+ // the deleted row key should no longer exist
+ client.addGet(rowKey1, new HBaseProjectionCriteria().addColumnFamily(columnFamily));
+ Assert.assertTrue(client.getAll()[0].isEmpty());
+
+ // the other row key should remain
+ client.addGet(rowKey2, new HBaseProjectionCriteria().addColumnFamily(columnFamily));
+ Assert.assertFalse(client.getAll()[0].isEmpty());
+ }
+
+ @Test
+ public void testDeleteNothing() {
+ // nothing should blow-up if we attempt to delete something that does not exist
+ client.delete(rowKey1);
+ }
+
+ @Test
+ public void testDeleteColumn() {
+ // write some values
+ ColumnList columns = new ColumnList()
+ .addColumn(columnFamily, "col1", "value1")
+ .addColumn(columnFamily, "col2", "value2");
+ client.addMutation(rowKey1, columns, Durability.SKIP_WAL);
+ client.mutate();
+
+ // delete a column
+ client.delete(rowKey1, new ColumnList().addColumn(columnFamily, "col1"));
+
+ // read back the value
+ client.addGet(rowKey1, new HBaseProjectionCriteria().addColumnFamily(columnFamily));
+ Result[] results = client.getAll();
+
+ // validate
+ assertEquals(1, results.length);
+ assertNull(getValue(results[0], columnFamily, "col1"));
+ assertEquals("value2", getValue(results[0], columnFamily, "col2"));
+ }
+
+ @Test
+ public void testDeleteAllColumns() {
+ // write some values
+ ColumnList columns = new ColumnList()
+ .addColumn(columnFamily, "col1", "value1")
+ .addColumn(columnFamily, "col2", "value2");
+ client.addMutation(rowKey1, columns, Durability.SKIP_WAL);
+ client.mutate();
+
+ // delete both columns individually
+ client.delete(rowKey1, new ColumnList().addColumn(columnFamily, "col1"));
+ client.delete(rowKey1, new ColumnList().addColumn(columnFamily, "col2"));
+
+ // read back the value
+ client.addGet(rowKey1, new HBaseProjectionCriteria().addColumnFamily(columnFamily));
+ Result[] results = client.getAll();
+
+ // validate
+ assertEquals(1, results.length);
+ assertNull(getValue(results[0], columnFamily, "col1"));
+ assertNull(getValue(results[0], columnFamily, "col2"));
+ }
+
+ private String getValue(Result result, String columnFamily, String columnQualifier) {
+ byte[] value = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(columnQualifier));
+ return Bytes.toString(value);
+ }
+}
diff --git a/metron-platform/metron-pcap/pom.xml b/metron-platform/metron-pcap/pom.xml
index c839285..ffa7f3d 100644
--- a/metron-platform/metron-pcap/pom.xml
+++ b/metron-platform/metron-pcap/pom.xml
@@ -194,5 +194,10 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>${global_httpclient_version}</version>
+ </dependency>
</dependencies>
</project>
diff --git a/pom.xml b/pom.xml
index 66a935e..f8c56ac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -91,7 +91,6 @@
<base_flux_version>1.0.1</base_flux_version>
<base_kafka_version>0.10.0</base_kafka_version>
<base_hadoop_version>2.7.1</base_hadoop_version>
- <base_hbase_version>1.1.1</base_hbase_version>
<base_flume_version>1.5.2</base_flume_version>
<!-- full dependency versions -->
<global_accumulo_version>1.8.0</global_accumulo_version>
@@ -106,14 +105,12 @@
<global_pcap_version>1.7.1</global_pcap_version>
<global_kafka_version>0.10.0.1</global_kafka_version>
<global_hadoop_version>${base_hadoop_version}</global_hadoop_version>
- <global_hbase_version>${base_hbase_version}</global_hbase_version>
<global_flume_version>${base_flume_version}</global_flume_version>
<global_elasticsearch_version>5.6.14</global_elasticsearch_version>
<global_json_simple_version>1.1.1</global_json_simple_version>
<global_metrics_version>3.0.2</global_metrics_version>
<global_junit_version>4.12</global_junit_version>
<global_guava_version>17.0</global_guava_version>
- <global_hbase_guava_version>12.0</global_hbase_guava_version>
<global_json_schema_validator_version>2.2.5</global_json_schema_validator_version>
<global_slf4j_version>1.7.7</global_slf4j_version>
<global_opencsv_version>3.7</global_opencsv_version>
@@ -139,9 +136,27 @@
<global_jacoco_version>0.8.3</global_jacoco_version>
<argLine></argLine>
</properties>
-
<profiles>
<profile>
+ <id>HDP-3.1</id>
+ <properties>
+ <hdp_version>3.1.0.0</hdp_version>
+ <global_hbase_version>2.0.2</global_hbase_version>
+ <global_hbase_guava_version>17.0</global_hbase_guava_version>
+ </properties>
+ </profile>
+ <profile>
+ <id>HDP-2.6</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <properties>
+ <hdp_version>2.6.5.0</hdp_version>
+ <global_hbase_version>1.1.1</global_hbase_version>
+ <global_hbase_guava_version>12.0</global_hbase_guava_version>
+ </properties>
+ </profile>
+ <profile>
<id>HDP-2.5.0.0</id>
<properties>
<hdp_version>2.5.0.0</hdp_version>
@@ -149,10 +164,11 @@
<global_storm_kafka_version>1.2.2</global_storm_kafka_version>
<global_storm_version>${base_storm_version}.${hdp_version}-${build_number}</global_storm_version>
<global_kafka_version>${base_kafka_version}.${hdp_version}-${build_number}</global_kafka_version>
+ <global_hbase_version>1.1.1</global_hbase_version>
+ <global_hbase_guava_version>12.0</global_hbase_guava_version>
</properties>
</profile>
</profiles>
-
<dependencyManagement>
<dependencies>
<dependency>