You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2019/07/23 13:06:55 UTC
[metron] branch feature/METRON-2088-support-hdp-3.1 updated:
METRON-2176 Upgrade REST for HBase 2.0.2 (nickwallen) closes
apache/metron#1457
This is an automated email from the ASF dual-hosted git repository.
nickallen pushed a commit to branch feature/METRON-2088-support-hdp-3.1
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/feature/METRON-2088-support-hdp-3.1 by this push:
new 87e2a4d METRON-2176 Upgrade REST for HBase 2.0.2 (nickwallen) closes apache/metron#1457
87e2a4d is described below
commit 87e2a4dcb1c004be1110b6366e76a898af2bb11d
Author: nickwallen <ni...@nickallen.org>
AuthorDate: Tue Jul 23 09:06:24 2019 -0400
METRON-2176 Upgrade REST for HBase 2.0.2 (nickwallen) closes apache/metron#1457
---
.../apache/metron/rest/MetronRestConstants.java | 1 -
.../org/apache/metron/rest/config/HBaseConfig.java | 111 ++++---
.../org/apache/metron/rest/config/IndexConfig.java | 79 ++---
.../metron/rest/service/AlertsUIService.java | 2 +-
.../rest/service/impl/AlertsUIServiceImpl.java | 13 +-
.../metron/rest/user/HBaseUserSettingsClient.java | 218 ++++++++++++++
.../metron/rest/user/UserSettingsClient.java | 229 +++++---------
.../src/main/resources/application-test.yml | 4 +-
.../apache/metron/rest/config/HBaseConfigTest.java | 86 ++++--
.../org/apache/metron/rest/config/TestConfig.java | 85 ++++--
.../AlertsUIControllerIntegrationTest.java | 4 +
.../metron/rest/controller/DaoControllerTest.java | 3 +-
...rEnrichmentConfigControllerIntegrationTest.java | 155 +++++-----
.../UpdateControllerIntegrationTest.java | 16 -
.../rest/service/impl/AlertsUIServiceImplTest.java | 167 +++++-----
.../rest/user/HBaseUserSettingsClientTest.java | 194 ++++++++++++
.../metron/rest/user/UserSettingsClientTest.java | 99 ------
.../metron/elasticsearch/dao/ElasticsearchDao.java | 5 +
.../dao/ElasticsearchMetaAlertDao.java | 5 +
.../dao/ElasticsearchMetaAlertDaoTest.java | 4 +
.../ElasticsearchUpdateIntegrationTest.java | 5 -
.../java/org/apache/metron/hbase/TableConfig.java | 5 +-
.../metron/hbase/client/FakeHBaseClient.java | 335 +++++++++++++++++++++
.../hbase/client/FakeHBaseClientFactory.java | 43 +++
.../metron/hbase/client/FakeHBaseClientTest.java | 268 +++++++++++++++++
.../hbase/client/FakeHBaseConnectionFactory.java | 89 ++++++
.../metron-indexing/metron-indexing-common/pom.xml | 28 ++
.../apache/metron/indexing/dao/AccessConfig.java | 32 +-
.../org/apache/metron/indexing/dao/HBaseDao.java | 126 +++++---
.../org/apache/metron/indexing/dao/IndexDao.java | 4 +-
.../apache/metron/indexing/dao/MultiIndexDao.java | 7 +
.../apache/metron/indexing/dao/InMemoryDao.java | 5 +
.../metron/indexing/dao/InMemoryMetaAlertDao.java | 6 +
.../metron/indexing/dao/UpdateIntegrationTest.java | 84 +++++-
.../integration/HBaseDaoIntegrationTest.java | 89 ++++--
.../java/org/apache/metron/solr/dao/SolrDao.java | 5 +
.../apache/metron/solr/dao/SolrMetaAlertDao.java | 10 +
.../org/apache/metron/solr/dao/SolrUtilities.java | 5 +-
.../metron/solr/dao/SolrMetaAlertDaoTest.java | 4 +
.../apache/metron/solr/dao/SolrSearchDaoTest.java | 22 +-
.../apache/metron/solr/dao/SolrUtilitiesTest.java | 5 +-
.../SolrRetrieveLatestIntegrationTest.java | 22 +-
.../integration/SolrUpdateIntegrationTest.java | 54 ++--
43 files changed, 2018 insertions(+), 715 deletions(-)
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
index 54f721c..0e79ee6 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
@@ -68,7 +68,6 @@ public class MetronRestConstants {
public static final String SEARCH_MAX_GROUPS = "search.max.groups";
public static final String SEARCH_FACET_FIELDS_SPRING_PROPERTY = "search.facet.fields";
public static final String INDEX_DAO_IMPL = "index.dao.impl";
- public static final String INDEX_HBASE_TABLE_PROVIDER_IMPL = "index.hbase.provider";
public static final String INDEX_WRITER_NAME = "index.writer.name";
public static final String META_DAO_IMPL = "meta.dao.impl";
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/HBaseConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/HBaseConfig.java
index c1476e8..44bcb12 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/HBaseConfig.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/HBaseConfig.java
@@ -17,66 +17,97 @@
*/
package org.apache.metron.rest.config;
-import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
-
-import java.lang.reflect.InvocationTargetException;
-import java.util.Map;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.metron.common.configuration.EnrichmentConfigurations;
import org.apache.metron.hbase.HTableProvider;
import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.hbase.client.HBaseClientFactory;
+import org.apache.metron.hbase.client.HBaseConnectionFactory;
+import org.apache.metron.hbase.client.HBaseTableClient;
+import org.apache.metron.hbase.client.HBaseTableClientFactory;
import org.apache.metron.hbase.client.LegacyHBaseClient;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.service.GlobalConfigService;
+import org.apache.metron.rest.user.HBaseUserSettingsClient;
import org.apache.metron.rest.user.UserSettingsClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
+
@Configuration
@Profile("!" + TEST_PROFILE)
public class HBaseConfig {
- @Autowired
- private GlobalConfigService globalConfigService;
+ @Autowired
+ private GlobalConfigService globalConfigService;
- @Autowired
- public HBaseConfig(GlobalConfigService globalConfigService) {
- this.globalConfigService = globalConfigService;
+ private Supplier<Map<String, Object>> globals = () -> {
+ try {
+ return globalConfigService.get();
+ } catch (RestException e) {
+ throw new IllegalStateException("Unable to retrieve the global config.", e);
}
+ };
- @Bean()
- public UserSettingsClient userSettingsClient() {
- UserSettingsClient userSettingsClient = new UserSettingsClient();
- userSettingsClient.init(() -> {
- try {
- return globalConfigService.get();
- } catch (RestException e) {
- throw new IllegalStateException("Unable to retrieve the global config.", e);
- }
- }, new HTableProvider());
- return userSettingsClient;
- }
+ @Autowired
+ public HBaseConfig(GlobalConfigService globalConfigService) {
+ this.globalConfigService = globalConfigService;
+ }
- @Bean()
- public LegacyHBaseClient hBaseClient() {
- Map<String, Object> restConfig = null;
- try {
- restConfig = globalConfigService.get();
- } catch (RestException e) {
- throw new IllegalStateException("Unable to retrieve the global config.", e);
- }
- TableProvider provider = null;
- try {
- provider = TableProvider
- .create((String) restConfig.get(EnrichmentConfigurations.TABLE_PROVIDER),
- HTableProvider::new);
- } catch (ClassNotFoundException | InstantiationException | InvocationTargetException | IllegalAccessException | NoSuchMethodException e) {
- throw new IllegalStateException("Unable to create table provider", e);
- }
- return new LegacyHBaseClient(provider, HBaseConfiguration.create(),
- (String) restConfig.get(EnrichmentConfigurations.TABLE_NAME));
- }
+ @Bean
+ public HBaseConnectionFactory hBaseConnectionFactory() {
+ return new HBaseConnectionFactory();
+ }
+
+ @Bean
+ org.apache.hadoop.conf.Configuration hBaseConfiguration() {
+ return HBaseConfiguration.create();
+ }
+ @Bean
+ HBaseClientFactory hBaseClientFactory() {
+ return new HBaseTableClientFactory();
+ }
+
+ @Bean(destroyMethod = "close")
+ public UserSettingsClient userSettingsClient(GlobalConfigService globalConfigService,
+ HBaseClientFactory hBaseClientFactory,
+ HBaseConnectionFactory hBaseConnectionFactory,
+ org.apache.hadoop.conf.Configuration hBaseConfiguration) {
+ UserSettingsClient userSettingsClient = new HBaseUserSettingsClient(
+ globals,
+ hBaseClientFactory,
+ hBaseConnectionFactory,
+ hBaseConfiguration);
+ userSettingsClient.init();
+ return userSettingsClient;
+ }
+
+ @Bean()
+ public LegacyHBaseClient legacyHBaseClient() {
+ Map<String, Object> restConfig = null;
+ try {
+ restConfig = globalConfigService.get();
+ } catch (RestException e) {
+ throw new IllegalStateException("Unable to retrieve the global config.", e);
+ }
+ TableProvider provider = null;
+ try {
+ provider = TableProvider
+ .create((String) restConfig.get(EnrichmentConfigurations.TABLE_PROVIDER),
+ HTableProvider::new);
+ } catch (ClassNotFoundException | InstantiationException | InvocationTargetException | IllegalAccessException | NoSuchMethodException e) {
+ throw new IllegalStateException("Unable to create table provider", e);
+ }
+ return new LegacyHBaseClient(provider, HBaseConfiguration.create(),
+ (String) restConfig.get(EnrichmentConfigurations.TABLE_NAME));
+ }
}
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
index 53b10f9..5ae8326 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
@@ -17,13 +17,8 @@
*/
package org.apache.metron.rest.config;
-import static org.apache.metron.rest.MetronRestConstants.INDEX_DAO_IMPL;
-import static org.apache.metron.rest.MetronRestConstants.INDEX_WRITER_NAME;
-
-import java.util.Optional;
import org.apache.metron.common.zookeeper.ConfigurationsCache;
-import org.apache.metron.hbase.HTableProvider;
-import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.hbase.client.HBaseConnectionFactory;
import org.apache.metron.indexing.dao.AccessConfig;
import org.apache.metron.indexing.dao.IndexDao;
import org.apache.metron.indexing.dao.IndexDaoFactory;
@@ -37,6 +32,11 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
+import java.util.Optional;
+
+import static org.apache.metron.rest.MetronRestConstants.INDEX_DAO_IMPL;
+import static org.apache.metron.rest.MetronRestConstants.INDEX_WRITER_NAME;
+
@Configuration
public class IndexConfig {
@@ -55,50 +55,55 @@ public class IndexConfig {
}
@Bean
- public IndexDao indexDao() {
+ public AccessConfig accessConfig(HBaseConnectionFactory hBaseConnectionFactory,
+ org.apache.hadoop.conf.Configuration hBaseConfiguration) {
+ int searchMaxResults = environment.getProperty(MetronRestConstants.SEARCH_MAX_RESULTS, Integer.class, 1000);
+ int searchMaxGroups = environment.getProperty(MetronRestConstants.SEARCH_MAX_GROUPS, Integer.class, 1000);
+
+ AccessConfig config = new AccessConfig();
+ config.setHbaseConnectionFactory(hBaseConnectionFactory);
+ config.setHbaseConfiguration(hBaseConfiguration);
+ config.setMaxSearchResults(searchMaxResults);
+ config.setMaxSearchGroups(searchMaxGroups);
+ config.setGlobalConfigSupplier(() -> {
+ try {
+ return globalConfigService.get();
+ } catch (RestException e) {
+ throw new IllegalStateException("Unable to retrieve the global config.", e);
+ }
+ });
+ config.setIndexSupplier(IndexingCacheUtil.getIndexLookupFunction(cache, environment.getProperty(INDEX_WRITER_NAME)));
+ config.setKerberosEnabled(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false));
+ return config;
+ }
+
+ @Bean(destroyMethod = "close")
+ public IndexDao indexDao(AccessConfig accessConfig) {
try {
- String hbaseProviderImpl = environment.getProperty(MetronRestConstants.INDEX_HBASE_TABLE_PROVIDER_IMPL, String.class, null);
String indexDaoImpl = environment.getProperty(MetronRestConstants.INDEX_DAO_IMPL, String.class, null);
- int searchMaxResults = environment.getProperty(MetronRestConstants.SEARCH_MAX_RESULTS, Integer.class, 1000);
- int searchMaxGroups = environment.getProperty(MetronRestConstants.SEARCH_MAX_GROUPS, Integer.class, 1000);
- String metaDaoImpl = environment.getProperty(MetronRestConstants.META_DAO_IMPL, String.class, null);
- String metaDaoSort = environment.getProperty(MetronRestConstants.META_DAO_SORT, String.class, null);
-
- AccessConfig config = new AccessConfig();
- config.setMaxSearchResults(searchMaxResults);
- config.setMaxSearchGroups(searchMaxGroups);
- config.setGlobalConfigSupplier(() -> {
- try {
- return globalConfigService.get();
- } catch (RestException e) {
- throw new IllegalStateException("Unable to retrieve the global config.", e);
- }
- });
- config.setIndexSupplier(IndexingCacheUtil.getIndexLookupFunction(cache, environment.getProperty(INDEX_WRITER_NAME)));
- config.setTableProvider(TableProvider.create(hbaseProviderImpl, () -> new HTableProvider()));
- config.setKerberosEnabled(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false));
if (indexDaoImpl == null) {
throw new IllegalStateException("You must provide an index DAO implementation via the " + INDEX_DAO_IMPL + " config");
}
- IndexDao indexDao = IndexDaoFactory.combine(IndexDaoFactory.create(indexDaoImpl, config));
+
+ IndexDao indexDao = IndexDaoFactory.combine(IndexDaoFactory.create(indexDaoImpl, accessConfig));
if (indexDao == null) {
throw new IllegalStateException("IndexDao is unable to be created.");
}
+
+ String metaDaoImpl = environment.getProperty(MetronRestConstants.META_DAO_IMPL, String.class, null);
if (metaDaoImpl == null) {
// We're not using meta alerts.
return indexDao;
- }
- // Create the meta alert dao and wrap it around the index dao.
- MetaAlertDao ret = (MetaAlertDao) IndexDaoFactory.create(metaDaoImpl, config).get(0);
- ret.init(indexDao, Optional.ofNullable(metaDaoSort));
- return ret;
+ } else {
+ // Create the meta alert dao and wrap it around the index dao.
+ String metaDaoSort = environment.getProperty(MetronRestConstants.META_DAO_SORT, String.class, null);
+ MetaAlertDao metaAlertDao = (MetaAlertDao) IndexDaoFactory.create(metaDaoImpl, accessConfig).get(0);
+ metaAlertDao.init(indexDao, Optional.ofNullable(metaDaoSort));
+ return metaAlertDao;
+ }
- }
- catch(RuntimeException re) {
- throw re;
- }
- catch(Exception e) {
+ } catch(Exception e) {
throw new IllegalStateException("Unable to create index DAO: " + e.getMessage(), e);
}
}
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/AlertsUIService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/AlertsUIService.java
index cd9ff9d..3b15879 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/AlertsUIService.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/AlertsUIService.java
@@ -37,5 +37,5 @@ public interface AlertsUIService {
void saveAlertsUIUserSettings(AlertsUIUserSettings userSettings) throws RestException;
- boolean deleteAlertsUIUserSettings(String user);
+ boolean deleteAlertsUIUserSettings(String user) throws RestException;
}
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/AlertsUIServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/AlertsUIServiceImpl.java
index 6092ce2..9854f83 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/AlertsUIServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/AlertsUIServiceImpl.java
@@ -126,7 +126,7 @@ public class AlertsUIServiceImpl implements AlertsUIService {
}
@Override
- public void saveAlertsUIUserSettings(AlertsUIUserSettings alertsUIUserSettings) throws RestException{
+ public void saveAlertsUIUserSettings(AlertsUIUserSettings alertsUIUserSettings) throws RestException {
String user = SecurityUtils.getCurrentUser();
try {
userSettingsClient.save(user, ALERT_USER_SETTING_TYPE, _mapper.get().writeValueAsString(alertsUIUserSettings));
@@ -136,12 +136,15 @@ public class AlertsUIServiceImpl implements AlertsUIService {
}
@Override
- public boolean deleteAlertsUIUserSettings(String user) {
- boolean success = true;
+ public boolean deleteAlertsUIUserSettings(String user) throws RestException {
+ boolean success = false;
try {
- userSettingsClient.delete(user, ALERT_USER_SETTING_TYPE);
+ if(userSettingsClient.findOne(user, ALERT_USER_SETTING_TYPE).isPresent()) {
+ userSettingsClient.delete(user, ALERT_USER_SETTING_TYPE);
+ success = true;
+ }
} catch (IOException e) {
- success = false;
+ throw new RestException(e);
}
return success;
}
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/user/HBaseUserSettingsClient.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/user/HBaseUserSettingsClient.java
new file mode 100644
index 0000000..e5432c3
--- /dev/null
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/user/HBaseUserSettingsClient.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.user;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.hbase.ColumnList;
+import org.apache.metron.hbase.HBaseProjectionCriteria;
+import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.hbase.client.HBaseClientFactory;
+import org.apache.metron.hbase.client.HBaseConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+/**
+ * A {@link UserSettingsClient} that interacts with user settings persisted in HBase.
+ */
+public class HBaseUserSettingsClient implements UserSettingsClient {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ public static final String USER_SETTINGS_HBASE_TABLE = "user.settings.hbase.table";
+ public static final String USER_SETTINGS_HBASE_CF = "user.settings.hbase.cf";
+ public static final String USER_SETTINGS_MAX_SCAN = "user.settings.max.scan";
+ private static final int DEFAULT_MAX_SCAN = 100_000;
+
+ private String columnFamily;
+ private Supplier<Map<String, Object>> globalConfigSupplier;
+ private HBaseConnectionFactory hBaseConnectionFactory;
+ private Configuration hBaseConfiguration;
+ private HBaseClientFactory hBaseClientFactory;
+ private HBaseClient hBaseClient;
+ private int maxScanCount;
+
+ public HBaseUserSettingsClient(Supplier<Map<String, Object>> globalConfigSupplier,
+ HBaseClientFactory hBaseClientFactory,
+ HBaseConnectionFactory hBaseConnectionFactory,
+ Configuration hBaseConfiguration) {
+ this.globalConfigSupplier = globalConfigSupplier;
+ this.hBaseClientFactory = hBaseClientFactory;
+ this.hBaseConnectionFactory = hBaseConnectionFactory;
+ this.hBaseConfiguration = hBaseConfiguration;
+ }
+
+ @Override
+ public synchronized void init() {
+ if (hBaseClient == null) {
+ Map<String, Object> globals = getGlobals();
+ columnFamily = getColumnFamily(globals);
+ maxScanCount = getMaxScanCount(globals);
+ hBaseClient = hBaseClientFactory.create(hBaseConnectionFactory, hBaseConfiguration, getTableName(globals));
+ }
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if(hBaseClient != null) {
+ hBaseClient.close();
+ }
+ }
+
+ @Override
+ public Map<String, String> findOne(String user) throws IOException {
+ Result result = getResult(user);
+ return getAllUserSettings(result);
+ }
+
+ @Override
+ public Optional<String> findOne(String user, String type) throws IOException {
+ Result result = getResult(user);
+ return getUserSettings(result, type);
+ }
+
+ @Override
+ public Map<String, Map<String, String>> findAll() throws IOException {
+ Map<String, Map<String, String>> settings = new HashMap<>();
+ for (Result result : hBaseClient.scan(maxScanCount)) {
+ String user = new String(result.getRow());
+ settings.put(user, getAllUserSettings(result));
+ }
+ return settings;
+ }
+
+ @Override
+ public Map<String, Optional<String>> findAll(String type) throws IOException {
+ Map<String, Optional<String>> settings = new HashMap<>();
+ for (Result result : hBaseClient.scan(maxScanCount)) {
+ String user = new String(result.getRow());
+ settings.put(user, getUserSettings(result, type));
+ }
+ return settings;
+ }
+
+ @Override
+ public void save(String user, String type, String userSettings) throws IOException {
+ byte[] rowKey = userToRowKey(user);
+ ColumnList columns = new ColumnList().addColumn(columnFamily, type, userSettings);
+ hBaseClient.addMutation(rowKey, columns);
+ hBaseClient.mutate();
+ }
+
+ @Override
+ public void delete(String user) {
+ hBaseClient.delete(userToRowKey(user));
+ }
+
+ @Override
+ public void delete(String user, String type) {
+ byte[] rowKey = userToRowKey(user);
+ ColumnList columns = new ColumnList().addColumn(columnFamily, type);
+ hBaseClient.delete(rowKey, columns);
+ }
+
+ private Result getResult(String user) {
+ // for the given user's row, retrieve the column family
+ byte[] rowKey = userToRowKey(user);
+ HBaseProjectionCriteria criteria = new HBaseProjectionCriteria().addColumnFamily(columnFamily);
+ hBaseClient.addGet(rowKey, criteria);
+ Result[] results = hBaseClient.getAll();
+
+ // expect 1 result, if the user settings exist
+ Result result = null;
+ if(results.length > 0) {
+ result = results[0];
+ } else {
+ LOG.debug("No result found; user={}, columnFamily={}", user, columnFamily);
+ }
+ return result;
+ }
+
+ private byte[] userToRowKey(String user) {
+ return Bytes.toBytes(user);
+ }
+
+ private Optional<String> getUserSettings(Result result, String type) throws IOException {
+ Optional<String> userSettings = Optional.empty();
+ if (result != null) {
+ byte[] value = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(type));
+ if (value != null) {
+ userSettings = Optional.of(new String(value, StandardCharsets.UTF_8));
+ }
+ }
+ return userSettings;
+ }
+
+ public Map<String, String> getAllUserSettings(Result result) {
+ if (result == null) {
+ return new HashMap<>();
+ }
+ NavigableMap<byte[], byte[]> columns = result.getFamilyMap(Bytes.toBytes(columnFamily));
+ if(columns == null || columns.size() == 0) {
+ return new HashMap<>();
+ }
+ Map<String, String> userSettingsMap = new HashMap<>();
+ for(Map.Entry<byte[], byte[]> column: columns.entrySet()) {
+ userSettingsMap.put(
+ new String(column.getKey(), StandardCharsets.UTF_8),
+ new String(column.getValue(), StandardCharsets.UTF_8));
+ }
+ return userSettingsMap;
+ }
+
+ private Map<String, Object> getGlobals() {
+ Map<String, Object> globalConfig = globalConfigSupplier.get();
+ if(globalConfig == null) {
+ throw new IllegalStateException("Cannot find the global config.");
+ }
+ return globalConfig;
+ }
+
+ private static String getTableName(Map<String, Object> globals) {
+ String table = (String) globals.get(USER_SETTINGS_HBASE_TABLE);
+ if(table == null) {
+ throw new IllegalStateException("You must configure " + USER_SETTINGS_HBASE_TABLE + "in the global config.");
+ }
+ return table;
+ }
+
+ private static String getColumnFamily(Map<String, Object> globals) {
+ String cf = (String) globals.get(USER_SETTINGS_HBASE_CF);
+ if(cf == null) {
+ throw new IllegalStateException("You must configure " + USER_SETTINGS_HBASE_CF + " in the global config.");
+ }
+ return cf;
+ }
+
+ private static int getMaxScanCount(Map<String, Object> globals) {
+ Integer maxScanCount = DEFAULT_MAX_SCAN;
+ if(globals.containsKey(USER_SETTINGS_MAX_SCAN)) {
+ String value = (String) globals.get(USER_SETTINGS_MAX_SCAN);
+ maxScanCount = Integer.valueOf(value);
+ }
+ return maxScanCount;
+ }
+}
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/user/UserSettingsClient.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/user/UserSettingsClient.java
index a08b775..8efd4dd 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/user/UserSettingsClient.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/user/UserSettingsClient.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,158 +17,85 @@
*/
package org.apache.metron.rest.user;
+import java.io.Closeable;
import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
import java.util.Map;
-import java.util.NavigableMap;
import java.util.Optional;
-import java.util.function.Supplier;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.metron.hbase.TableProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class UserSettingsClient {
-
- private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- public static String USER_SETTINGS_HBASE_TABLE = "user.settings.hbase.table";
- public static String USER_SETTINGS_HBASE_CF = "user.settings.hbase.cf";
-
- private HTableInterface userSettingsTable;
- private byte[] cf;
- private Supplier<Map<String, Object>> globalConfigSupplier;
- private TableProvider tableProvider;
-
- public UserSettingsClient() {
- }
-
- public UserSettingsClient(Supplier<Map<String, Object>> globalConfigSupplier, TableProvider tableProvider) {
- this.globalConfigSupplier = globalConfigSupplier;
- this.tableProvider = tableProvider;
- }
-
- public UserSettingsClient(HTableInterface userSettingsTable, byte[] cf) {
- this.userSettingsTable = userSettingsTable;
- this.cf = cf;
- }
-
- public synchronized void init(Supplier<Map<String, Object>> globalConfigSupplier, TableProvider tableProvider) {
- if (this.userSettingsTable == null) {
- Map<String, Object> globalConfig = globalConfigSupplier.get();
- if(globalConfig == null) {
- throw new IllegalStateException("Cannot find the global config.");
- }
- String table = (String)globalConfig.get(USER_SETTINGS_HBASE_TABLE);
- String cf = (String) globalConfigSupplier.get().get(USER_SETTINGS_HBASE_CF);
- if(table == null || cf == null) {
- throw new IllegalStateException("You must configure " + USER_SETTINGS_HBASE_TABLE + " and " + USER_SETTINGS_HBASE_CF + " in the global config.");
- }
- try {
- userSettingsTable = tableProvider.getTable(HBaseConfiguration.create(), table);
- this.cf = cf.getBytes();
- } catch (IOException e) {
- throw new IllegalStateException("Unable to initialize HBaseDao: " + e.getMessage(), e);
- }
-
- }
- }
-
- public HTableInterface getTableInterface() {
- if(userSettingsTable == null) {
- init(globalConfigSupplier, tableProvider);
- }
- return userSettingsTable;
- }
- public Map<String, String> findOne(String user) throws IOException {
- Result result = getResult(user);
- return getAllUserSettings(result);
- }
-
- public Optional<String> findOne(String user, String type) throws IOException {
- Result result = getResult(user);
- return getUserSettings(result, type);
- }
-
- public Map<String, Map<String, String>> findAll() throws IOException {
- Scan scan = new Scan();
- ResultScanner results = getTableInterface().getScanner(scan);
- Map<String, Map<String, String>> allUserSettings = new HashMap<>();
- for (Result result : results) {
- allUserSettings.put(new String(result.getRow()), getAllUserSettings(result));
- }
- return allUserSettings;
- }
-
- public Map<String, Optional<String>> findAll(String type) throws IOException {
- Scan scan = new Scan();
- ResultScanner results = getTableInterface().getScanner(scan);
- Map<String, Optional<String>> allUserSettings = new HashMap<>();
- for (Result result : results) {
- allUserSettings.put(new String(result.getRow()), getUserSettings(result, type));
- }
- return allUserSettings;
- }
-
- public void save(String user, String type, String userSettings) throws IOException {
- byte[] rowKey = Bytes.toBytes(user);
- Put put = new Put(rowKey);
- put.addColumn(cf, Bytes.toBytes(type), Bytes.toBytes(userSettings));
- getTableInterface().put(put);
- }
-
- public void delete(String user) throws IOException {
- Delete delete = new Delete(Bytes.toBytes(user));
- getTableInterface().delete(delete);
- }
-
- public void delete(String user, String type) throws IOException {
- Delete delete = new Delete(Bytes.toBytes(user));
- delete.addColumn(cf, Bytes.toBytes(type));
- getTableInterface().delete(delete);
- }
-
- private Result getResult(String user) throws IOException {
- byte[] rowKey = Bytes.toBytes(user);
- Get get = new Get(rowKey);
- get.addFamily(cf);
- return getTableInterface().get(get);
- }
-
- private Optional<String> getUserSettings(Result result, String type) throws IOException {
- Optional<String> userSettings = Optional.empty();
- if (result != null) {
- byte[] value = result.getValue(cf, Bytes.toBytes(type));
- if (value != null) {
- userSettings = Optional.of(new String(value, StandardCharsets.UTF_8));
- }
- }
- return userSettings;
- }
-
- public Map<String, String> getAllUserSettings(Result result) {
- if (result == null) {
- return new HashMap<>();
- }
- NavigableMap<byte[], byte[]> columns = result.getFamilyMap(cf);
- if(columns == null || columns.size() == 0) {
- return new HashMap<>();
- }
- Map<String, String> userSettingsMap = new HashMap<>();
- for(Map.Entry<byte[], byte[]> column: columns.entrySet()) {
- userSettingsMap.put(new String(column.getKey(), StandardCharsets.UTF_8), new String(column.getValue(), StandardCharsets.UTF_8));
- }
- return userSettingsMap;
- }
+/**
+ * A client that interacts with persisted user settings data. Enables create, read, and
+ * delete operations on user settings.
+ */
+public interface UserSettingsClient extends Closeable {
+
+ /**
+ * Initialize the client.
+ */
+ void init();
+
+ /**
+ * Finds all settings for a particular user.
+ *
+ * <p>There may be more than one type of setting associated with each user. This returns all
+ * setting types for a particular user.
+ *
+ * @param user The user.
+ * @return All of the user's settings.
+ * @throws IOException
+ */
+ Map<String, String> findOne(String user) throws IOException;
+
+ /**
+ * Finds one setting for a particular user.
+ *
+ * @param user The user.
+ * @param type The setting type.
+ * @return The value of the setting.
+ * @throws IOException
+ */
+ Optional<String> findOne(String user, String type) throws IOException;
+
+ /**
+ * Finds all settings for all users.
+ *
+ * @return All settings value for all users.
+ * @throws IOException
+ */
+ Map<String, Map<String, String>> findAll() throws IOException;
+
+ /**
+ * Finds the value of a particular setting type for all users.
+ *
+ * @param type The setting type.
+ * @return The value of the setting type for all users.
+ * @throws IOException
+ */
+ Map<String, Optional<String>> findAll(String type) throws IOException;
+
+ /**
+ * Save a user setting.
+ *
+ * @param user The user.
+ * @param type The setting type.
+ * @param value The value of the user setting.
+ * @throws IOException
+ */
+ void save(String user, String type, String value) throws IOException;
+
+ /**
+ * Delete all user settings for a particular user.
+ *
+ * @param user The user.
+ * @throws IOException
+ */
+ void delete(String user) throws IOException;
+
+ /**
+ * Delete one user setting value.
+ *
+ * @param user The user.
+ * @param type The setting type.
+ * @throws IOException
+ */
+ void delete(String user, String type) throws IOException;
}
diff --git a/metron-interface/metron-rest/src/main/resources/application-test.yml b/metron-interface/metron-rest/src/main/resources/application-test.yml
index 0e794cb..3f99311 100644
--- a/metron-interface/metron-rest/src/main/resources/application-test.yml
+++ b/metron-interface/metron-rest/src/main/resources/application-test.yml
@@ -49,8 +49,8 @@ search:
index:
dao:
- # By default, we use the InMemoryDao for our tests and HBaseDao for backing updates.
- impl: org.apache.metron.indexing.dao.InMemoryDao,org.apache.metron.indexing.dao.HBaseDao
+ # By default, we use the InMemoryDao for our tests
+ impl: org.apache.metron.indexing.dao.InMemoryDao
hbase:
# HBase is provided via a mock provider, so no actual HBase infrastructure is started.
provider: org.apache.metron.hbase.mock.MockHBaseTableProvider
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/HBaseConfigTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/HBaseConfigTest.java
index 0d45f18..80cc34f 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/HBaseConfigTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/HBaseConfigTest.java
@@ -17,22 +17,19 @@
*/
package org.apache.metron.rest.config;
-import static org.apache.metron.rest.user.UserSettingsClient.USER_SETTINGS_HBASE_CF;
-import static org.apache.metron.rest.user.UserSettingsClient.USER_SETTINGS_HBASE_TABLE;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.mockStatic;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
-import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.metron.common.configuration.EnrichmentConfigurations;
import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.client.FakeHBaseClientFactory;
+import org.apache.metron.hbase.client.HBaseClientFactory;
+import org.apache.metron.hbase.client.HBaseConnectionFactory;
import org.apache.metron.hbase.mock.MockHBaseTableProvider;
import org.apache.metron.rest.service.GlobalConfigService;
+import org.apache.metron.rest.user.UserSettingsClient;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -40,42 +37,70 @@ import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import java.io.IOException;
+import java.util.HashMap;
+
+import static org.apache.metron.rest.user.HBaseUserSettingsClient.USER_SETTINGS_HBASE_CF;
+import static org.apache.metron.rest.user.HBaseUserSettingsClient.USER_SETTINGS_HBASE_TABLE;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
@RunWith(PowerMockRunner.class)
-@PrepareForTest({HTableProvider.class, HBaseConfiguration.class, HBaseConfig.class})
+@PrepareForTest({Table.class, HBaseConfiguration.class, HBaseConfig.class})
public class HBaseConfigTest {
+ private HBaseConnectionFactory hBaseConnectionFactory;
+ private HBaseClientFactory hBaseClientCreator;
+ private HBaseConfiguration hBaseConfiguration;
+ private Configuration configuration;
private GlobalConfigService globalConfigService;
private HBaseConfig hBaseConfig;
+ private Connection connection;
+ private Table table;
@Before
- public void setUp() throws Exception {
+ public void setUp() throws IOException {
+ connection = mock(Connection.class);
+ table = mock(Table.class);
+ hBaseConnectionFactory = mock(HBaseConnectionFactory.class);
+ configuration = mock(Configuration.class);
+ hBaseConfiguration = mock(HBaseConfiguration.class);
+ hBaseClientCreator = mock(FakeHBaseClientFactory.class);
globalConfigService = mock(GlobalConfigService.class);
hBaseConfig = new HBaseConfig(globalConfigService);
mockStatic(HBaseConfiguration.class);
+ when(HBaseConfiguration.create()).thenReturn(configuration);
}
@Test
- public void userSettingsTableShouldBeReturnedFromGlobalConfigByDefault() throws Exception {
+ public void userSettingsShouldBeCreated() throws Exception {
+ final String expectedTable = "hbase-table-name";
+ final String expectedColumnFamily = "hbase-column-family";
when(globalConfigService.get()).thenReturn(new HashMap<String, Object>() {{
- put(USER_SETTINGS_HBASE_TABLE, "global_config_user_settings_table");
- put(USER_SETTINGS_HBASE_CF, "global_config_user_settings_cf");
+ put(USER_SETTINGS_HBASE_TABLE, expectedTable);
+ put(USER_SETTINGS_HBASE_CF, expectedColumnFamily);
}});
- HTableProvider htableProvider = mock(HTableProvider.class);
- whenNew(HTableProvider.class).withNoArguments().thenReturn(htableProvider);
- Configuration configuration = mock(Configuration.class);
- when(HBaseConfiguration.create()).thenReturn(configuration);
- hBaseConfig.userSettingsClient();
- verify(htableProvider).getTable(configuration, "global_config_user_settings_table");
- verifyZeroInteractions(htableProvider);
- }
+ // connection factory needs to return the mock connection
+ when(hBaseConnectionFactory.createConnection(any()))
+ .thenReturn(connection);
- @Test
- public void hBaseClientShouldBeCreatedWithDefaultProvider() throws Exception {
- when(globalConfigService.get()).thenReturn(new HashMap<String, Object>() {{
- put(EnrichmentConfigurations.TABLE_NAME, "enrichment_list_hbase_table_name");
- }});
- Assert.assertNotNull(hBaseConfig.hBaseClient());
+ // connection should return the table, if the expected table name is used
+ when(connection.getTable(eq(TableName.valueOf(expectedTable))))
+ .thenReturn(table);
+
+ UserSettingsClient client = hBaseConfig.userSettingsClient(
+ globalConfigService,
+ hBaseClientCreator,
+ hBaseConnectionFactory,
+ hBaseConfiguration);
+ Assert.assertNotNull(client);
}
@Test
@@ -84,7 +109,6 @@ public class HBaseConfigTest {
put(EnrichmentConfigurations.TABLE_PROVIDER, MockHBaseTableProvider.class.getName());
put(EnrichmentConfigurations.TABLE_NAME, "enrichment_list_hbase_table_name");
}});
- Assert.assertNotNull(hBaseConfig.hBaseClient());
+ Assert.assertNotNull(hBaseConfig.legacyHBaseClient());
}
-
}
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
index b3a478b..667eb26 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
@@ -17,18 +17,6 @@
*/
package org.apache.metron.rest.config;
-import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
import kafka.admin.AdminUtils$;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
@@ -46,6 +34,12 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.metron.common.configuration.ConfigurationsUtils;
import org.apache.metron.common.zookeeper.ConfigurationsCache;
import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
+import org.apache.metron.hbase.client.FakeHBaseClient;
+import org.apache.metron.hbase.client.FakeHBaseClientFactory;
+import org.apache.metron.hbase.client.FakeHBaseConnectionFactory;
+import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.hbase.client.HBaseClientFactory;
+import org.apache.metron.hbase.client.HBaseConnectionFactory;
import org.apache.metron.hbase.client.LegacyHBaseClient;
import org.apache.metron.hbase.mock.MockHBaseTableProvider;
import org.apache.metron.integration.ComponentRunner;
@@ -64,6 +58,7 @@ import org.apache.metron.rest.service.StormStatusService;
import org.apache.metron.rest.service.impl.CachedStormStatusServiceImpl;
import org.apache.metron.rest.service.impl.PcapToPdmlScriptWrapper;
import org.apache.metron.rest.service.impl.StormCLIWrapper;
+import org.apache.metron.rest.user.HBaseUserSettingsClient;
import org.apache.metron.rest.user.UserSettingsClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
@@ -74,13 +69,31 @@ import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.web.client.RestTemplate;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
+
@Configuration
@Profile(TEST_PROFILE)
public class TestConfig {
- static {
- MockHBaseTableProvider.addToCache("updates", "t");
- }
+ @Autowired
+ private HBaseConnectionFactory hBaseConnectionFactory;
+
+ @Autowired
+ private org.apache.hadoop.conf.Configuration hBaseConfiguration;
+
+ @Autowired
+ private HBaseClientFactory hBaseClientFactory;
@Bean
public Properties zkProperties() {
@@ -196,13 +209,44 @@ public class TestConfig {
return AdminUtils$.MODULE$;
}
- @Bean()
- public UserSettingsClient userSettingsClient() throws RestException, IOException {
- return new UserSettingsClient(new MockHBaseTableProvider().addToCache("user_settings", "cf"), Bytes.toBytes("cf"));
+ @Bean(destroyMethod = "close")
+ public UserSettingsClient userSettingsClient() {
+ Map<String, Object> globals = new HashMap<String, Object>() {{
+ put(HBaseUserSettingsClient.USER_SETTINGS_HBASE_TABLE, "user_settings");
+ put(HBaseUserSettingsClient.USER_SETTINGS_HBASE_CF, "cf");
+ }};
+
+ UserSettingsClient userSettingsClient = new HBaseUserSettingsClient(
+ () -> globals,
+ hBaseClientFactory,
+ hBaseConnectionFactory,
+ hBaseConfiguration);
+ userSettingsClient.init();
+ return userSettingsClient;
+ }
+
+ @Bean
+ public HBaseConnectionFactory hBaseConnectionFactory() {
+ return new FakeHBaseConnectionFactory();
+ }
+
+ @Bean
+ org.apache.hadoop.conf.Configuration hBaseConfiguration() {
+ return HBaseConfiguration.create();
+ }
+
+ @Bean
+ HBaseClientFactory hBaseClientFactory() {
+ return new FakeHBaseClientFactory();
+ }
+
+ @Bean(destroyMethod = "close")
+ public HBaseClient hBaseClient() {
+ return new FakeHBaseClient();
}
@Bean()
- public LegacyHBaseClient hBaseClient() throws RestException, IOException {
+ public LegacyHBaseClient legacyHBaseClient() throws RestException, IOException {
final String cf = "t";
final String cq = "v";
HTableInterface table = MockHBaseTableProvider.addToCache("enrichment_list", cf);
@@ -216,8 +260,7 @@ public class TestConfig {
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cq), "{}".getBytes(StandardCharsets.UTF_8));
table.put(put);
}
- return new LegacyHBaseClient(new MockHBaseTableProvider(), HBaseConfiguration.create(),
- "enrichment_list");
+ return new LegacyHBaseClient(new MockHBaseTableProvider(), HBaseConfiguration.create(), "enrichment_list");
}
@Bean
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/AlertsUIControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/AlertsUIControllerIntegrationTest.java
index 49863d6..40363f9 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/AlertsUIControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/AlertsUIControllerIntegrationTest.java
@@ -30,6 +30,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.hbase.client.FakeHBaseClient;
import org.apache.metron.integration.ComponentRunner;
import org.apache.metron.integration.UnableToStartException;
import org.apache.metron.integration.components.KafkaComponent;
@@ -141,6 +142,9 @@ public class AlertsUIControllerIntegrationTest {
alertsUIService.deleteAlertsUIUserSettings(user);
}
this.mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).apply(springSecurity()).build();
+
+ // ensure the fake hbase client is reset before each test
+ new FakeHBaseClient().deleteAll();
}
@Test
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/DaoControllerTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/DaoControllerTest.java
index bd3f5bd..dcc3a0d 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/DaoControllerTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/DaoControllerTest.java
@@ -30,8 +30,7 @@ import java.util.List;
import java.util.Map;
public class DaoControllerTest {
- public static final String TABLE = "updates";
- public static final String CF = "t";
+
public void loadTestData(Map<String, String> indicesToDataMap) throws ParseException {
Map<String, List<String>> backingStore = new HashMap<>();
for(Map.Entry<String, String> indices : indicesToDataMap.entrySet())
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorEnrichmentConfigControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorEnrichmentConfigControllerIntegrationTest.java
index d77e3ae..690ed0d 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorEnrichmentConfigControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorEnrichmentConfigControllerIntegrationTest.java
@@ -17,19 +17,10 @@
*/
package org.apache.metron.rest.controller;
-import static org.apache.metron.integration.utils.TestUtils.assertEventually;
-import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
-import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf;
-import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic;
-import static org.springframework.security.test.web.servlet.setup.SecurityMockMvcConfigurers.springSecurity;
-import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.delete;
-import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
-import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
-import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
-import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
-import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
-
import org.adrianwalker.multilinestring.Multiline;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.hbase.ColumnList;
+import org.apache.metron.hbase.client.FakeHBaseClient;
import org.apache.metron.rest.service.SensorEnrichmentConfigService;
import org.hamcrest.core.IsCollectionContaining;
import org.junit.Before;
@@ -41,71 +32,86 @@ import org.springframework.http.MediaType;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.servlet.MockMvc;
+import org.springframework.test.web.servlet.MvcResult;
+import org.springframework.test.web.servlet.request.MockHttpServletRequestBuilder;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import org.springframework.web.context.WebApplicationContext;
+import static org.apache.metron.integration.utils.TestUtils.assertEventually;
+import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
+import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf;
+import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic;
+import static org.springframework.security.test.web.servlet.setup.SecurityMockMvcConfigurers.springSecurity;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.delete;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment= SpringBootTest.WebEnvironment.RANDOM_PORT)
@ActiveProfiles(TEST_PROFILE)
public class SensorEnrichmentConfigControllerIntegrationTest {
/**
- {
- "enrichment": {
- "fieldMap": {
- "geo": [
- "ip_dst_addr"
- ],
- "host": [
- "ip_dst_addr"
- ],
- "hbaseEnrichment": [
- "ip_src_addr"
- ],
- "stellar": {
- "config": {
- "group1": {
- "foo": "1 + 1",
- "bar": "foo"
- },
- "group2": {
- "ALL_CAPS": "TO_UPPER(source.type)"
- }
- }
- }
- },
- "fieldToTypeMap": {
- "ip_src_addr": [
- "sample"
- ]
- }
- },
- "threatIntel": {
- "fieldMap": {
- "hbaseThreatIntel": [
- "ip_src_addr",
- "ip_dst_addr"
- ]
- },
- "fieldToTypeMap": {
- "ip_src_addr": [
- "malicious_ip"
- ],
- "ip_dst_addr": [
- "malicious_ip"
- ]
- },
- "triageConfig": {
- "riskLevelRules": [
- {
- "rule": "ip_src_addr == '10.122.196.204' or ip_dst_addr == '10.122.196.204'",
- "score": 10
- }
- ],
- "aggregator": "MAX"
- }
- }
- }
+ * {
+ * "enrichment":{
+ * "fieldMap":{
+ * "geo":[
+ * "ip_dst_addr"
+ * ],
+ * "host":[
+ * "ip_dst_addr"
+ * ],
+ * "hbaseEnrichment":[
+ * "ip_src_addr"
+ * ],
+ * "stellar":{
+ * "config":{
+ * "group1":{
+ * "foo":"1 + 1",
+ * "bar":"foo"
+ * },
+ * "group2":{
+ * "ALL_CAPS":"TO_UPPER(source.type)"
+ * }
+ * }
+ * }
+ * },
+ * "fieldToTypeMap":{
+ * "ip_src_addr":[
+ * "sample"
+ * ]
+ * }
+ * },
+ * "threatIntel":{
+ * "fieldMap":{
+ * "hbaseThreatIntel":[
+ * "ip_src_addr",
+ * "ip_dst_addr"
+ * ]
+ * },
+ * "fieldToTypeMap":{
+ * "ip_src_addr":[
+ * "malicious_ip"
+ * ],
+ * "ip_dst_addr":[
+ * "malicious_ip"
+ * ]
+ * },
+ * "triageConfig":{
+ * "riskLevelRules":[
+ * {
+ * "rule":"ip_src_addr == '10.122.196.204' or ip_dst_addr == '10.122.196.204'",
+ * "score":10
+ * }
+ * ],
+ * "aggregator":"MAX"
+ * }
+ * }
+ * }
+ *
*/
@Multiline
public static String broJson;
@@ -117,7 +123,6 @@ public class SensorEnrichmentConfigControllerIntegrationTest {
private WebApplicationContext wac;
private MockMvc mockMvc;
-
private String sensorEnrichmentConfigUrl = "/api/v1/sensor/enrichment/config";
private String user = "user";
private String password = "password";
@@ -125,6 +130,9 @@ public class SensorEnrichmentConfigControllerIntegrationTest {
@Before
public void setup() throws Exception {
this.mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).apply(springSecurity()).build();
+
+ // clear any fake hbase data between tests
+ new FakeHBaseClient().deleteAll();
}
@Test
@@ -242,12 +250,6 @@ public class SensorEnrichmentConfigControllerIntegrationTest {
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
.andExpect(jsonPath("$[?(@.sensorTopic == 'broTest')]").doesNotExist());
- this.mockMvc.perform(get(sensorEnrichmentConfigUrl + "/list/available/enrichments").with(httpBasic(user,password)))
- .andExpect(status().isOk())
- .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
- .andExpect(jsonPath("$.length()").value("3"))
- .andExpect(jsonPath("$.*").value(IsCollectionContaining.hasItems("foo", "bar", "baz")));
-
this.mockMvc.perform(get(sensorEnrichmentConfigUrl + "/list/available/threat/triage/aggregators").with(httpBasic(user,password)))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
@@ -255,8 +257,7 @@ public class SensorEnrichmentConfigControllerIntegrationTest {
.andExpect(jsonPath("$[1]").value("MIN"))
.andExpect(jsonPath("$[2]").value("SUM"))
.andExpect(jsonPath("$[3]").value("MEAN"))
- .andExpect(jsonPath("$[4]").value("POSITIVE_MEAN"))
- ;
+ .andExpect(jsonPath("$[4]").value("POSITIVE_MEAN"));
sensorEnrichmentConfigService.delete("broTest");
}
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java
index 7995557..47fd9d9 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java
@@ -22,8 +22,6 @@ import org.adrianwalker.multilinestring.Multiline;
import org.apache.curator.framework.CuratorFramework;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.metron.hbase.mock.MockHBaseTableProvider;
-import org.apache.metron.hbase.mock.MockHTable;
import org.apache.metron.indexing.dao.HBaseDao;
import org.apache.metron.indexing.dao.SearchIntegrationTest;
import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
@@ -162,10 +160,6 @@ public class UpdateControllerIntegrationTest extends DaoControllerTest {
.andExpect(jsonPath("$.project").doesNotExist())
.andExpect(jsonPath("$.timestamp").value(2));
- // nothing is recorded in HBase
- MockHTable table = (MockHTable) MockHBaseTableProvider.getFromCache(TABLE);
- Assert.assertEquals(0,table.size());
-
// patch the document
this.mockMvc.perform(patchRequest)
.andExpect(status().isOk());
@@ -178,16 +172,6 @@ public class UpdateControllerIntegrationTest extends DaoControllerTest {
.andExpect(jsonPath("$.guid").value(guid))
.andExpect(jsonPath("$.project").value("metron"))
.andExpect(jsonPath("$.timestamp").value(2));
-
- // the change should be recorded in HBase
- Assert.assertEquals(1,table.size());
- {
- //ensure hbase is up to date
- Get g = new Get(new HBaseDao.Key(guid,"bro").toBytes());
- Result r = table.get(g);
- NavigableMap<byte[], byte[]> columns = r.getFamilyMap(CF.getBytes());
- Assert.assertEquals(1, columns.size());
- }
}
@Test
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/AlertsUIServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/AlertsUIServiceImplTest.java
index 016a08b..ffaae1e 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/AlertsUIServiceImplTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/AlertsUIServiceImplTest.java
@@ -17,30 +17,17 @@
*/
package org.apache.metron.rest.service.impl;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.mock;
-
import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import org.adrianwalker.multilinestring.Multiline;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.metron.common.system.FakeClock;
+import org.apache.metron.hbase.client.FakeHBaseClientFactory;
+import org.apache.metron.hbase.client.FakeHBaseConnectionFactory;
import org.apache.metron.rest.MetronRestConstants;
import org.apache.metron.rest.model.AlertsUIUserSettings;
import org.apache.metron.rest.service.KafkaService;
+import org.apache.metron.rest.user.HBaseUserSettingsClient;
import org.apache.metron.rest.user.UserSettingsClient;
import org.junit.Before;
import org.junit.Test;
@@ -50,27 +37,34 @@ import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.security.core.userdetails.UserDetails;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.metron.rest.service.impl.AlertsUIServiceImpl.ALERT_USER_SETTING_TYPE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+/**
+ * Tests the {@link AlertsUIServiceImpl} class.
+ */
@SuppressWarnings("unchecked")
public class AlertsUIServiceImplTest {
public static ThreadLocal<ObjectMapper> _mapper = ThreadLocal.withInitial(() ->
new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL));
- /**
- * {
- * "tableColumns": ["user1_field"]
- * }
- */
- @Multiline
- public static String user1AlertUserSettings;
-
- /**
- * {
- * "tableColumns": ["user2_field"]
- * }
- */
- @Multiline
- public static String user2AlertUserSettings;
+ private static final String field = "field";
+ private static final String value1 = "value1";
+ private static final String value2 = "value2";
+ private static final String escalationTopic = "escalation";
private KafkaService kafkaService;
private Environment environment;
@@ -85,7 +79,19 @@ public class AlertsUIServiceImplTest {
public void setUp() throws Exception {
kafkaService = mock(KafkaService.class);
environment = mock(Environment.class);
- userSettingsClient = mock(UserSettingsClient.class);
+
+ Map<String, Object> globals = new HashMap<String, Object>() {{
+ put(HBaseUserSettingsClient.USER_SETTINGS_HBASE_TABLE, "some_table");
+ put(HBaseUserSettingsClient.USER_SETTINGS_HBASE_CF, "column_family");
+ put(HBaseUserSettingsClient.USER_SETTINGS_MAX_SCAN, "100000");
+ }};
+
+ userSettingsClient = new HBaseUserSettingsClient(
+ () -> globals,
+ new FakeHBaseClientFactory(),
+ new FakeHBaseConnectionFactory(),
+ HBaseConfiguration.create());
+ userSettingsClient.init();
alertsUIService = new AlertsUIServiceImpl(kafkaService, environment, userSettingsClient);
// use a fake clock for testing
@@ -103,12 +109,7 @@ public class AlertsUIServiceImplTest {
@Test
public void escalateAlertShouldSendMessageToKafka() throws Exception {
- final String field = "field";
- final String value1 = "value1";
- final String value2 = "value2";
-
// define the escalation topic
- final String escalationTopic = "escalation";
when(environment.getProperty(MetronRestConstants.KAFKA_TOPICS_ESCALATION_PROPERTY)).thenReturn(escalationTopic);
// create an alert along with the expected escalation message that is sent to kafka
@@ -126,63 +127,63 @@ public class AlertsUIServiceImplTest {
}
@Test
- public void getShouldProperlyReturnActiveProfile() throws Exception {
- when(userSettingsClient.findOne(user1, AlertsUIServiceImpl.ALERT_USER_SETTING_TYPE)).thenReturn(Optional.of(user1AlertUserSettings));
-
- AlertsUIUserSettings expectedAlertsUIUserSettings = new AlertsUIUserSettings();
- expectedAlertsUIUserSettings.setTableColumns(Collections.singletonList("user1_field"));
- assertEquals(expectedAlertsUIUserSettings, alertsUIService.getAlertsUIUserSettings().get());
- verify(userSettingsClient, times(1)).findOne(user1, AlertsUIServiceImpl.ALERT_USER_SETTING_TYPE);
- verifyNoMoreInteractions(userSettingsClient);
+ public void shouldGetActiveProfile() throws Exception {
+ AlertsUIUserSettings expected = new AlertsUIUserSettings();
+ expected.setTableColumns(Collections.singletonList("user1_field"));
+
+ // save a profile for current user
+ userSettingsClient.save(user1, ALERT_USER_SETTING_TYPE, toJSON(expected));
+
+ // retrieve the active profile
+ assertEquals(expected, alertsUIService.getAlertsUIUserSettings().get());
}
@Test
- public void findAllShouldProperlyReturnActiveProfiles() throws Exception {
- AlertsUIUserSettings alertsProfile1 = new AlertsUIUserSettings();
- alertsProfile1.setUser(user1);
- AlertsUIUserSettings alertsProfile2 = new AlertsUIUserSettings();
- alertsProfile2.setUser(user1);
- when(userSettingsClient.findAll(AlertsUIServiceImpl.ALERT_USER_SETTING_TYPE))
- .thenReturn(new HashMap<String, Optional<String>>() {{
- put(user1, Optional.of(user1AlertUserSettings));
- put(user2, Optional.of(user2AlertUserSettings));
- }});
-
- AlertsUIUserSettings expectedAlertsUIUserSettings1 = new AlertsUIUserSettings();
- expectedAlertsUIUserSettings1.setTableColumns(Collections.singletonList("user1_field"));
- AlertsUIUserSettings expectedAlertsUIUserSettings2 = new AlertsUIUserSettings();
- expectedAlertsUIUserSettings2.setTableColumns(Collections.singletonList("user2_field"));
+ public void shouldFindAllActiveProfiles() throws Exception {
+ AlertsUIUserSettings settings1 = new AlertsUIUserSettings();
+ settings1.setTableColumns(Collections.singletonList("user1_field"));
+
+ AlertsUIUserSettings settings2 = new AlertsUIUserSettings();
+ settings2.setTableColumns(Collections.singletonList("user2_field"));
+
+ // save some profiles
+ userSettingsClient.save(user1, ALERT_USER_SETTING_TYPE, toJSON(settings1));
+ userSettingsClient.save(user2, ALERT_USER_SETTING_TYPE, toJSON(settings2));
+
+ // retrieve all active profiles
Map<String, AlertsUIUserSettings> actualAlertsProfiles = alertsUIService.findAllAlertsUIUserSettings();
assertEquals(2, actualAlertsProfiles.size());
- assertEquals(expectedAlertsUIUserSettings1, actualAlertsProfiles.get(user1));
- assertEquals(expectedAlertsUIUserSettings2, actualAlertsProfiles.get(user2));
-
- verify(userSettingsClient, times(1)).findAll(AlertsUIServiceImpl.ALERT_USER_SETTING_TYPE);
- verifyNoMoreInteractions(userSettingsClient);
+ assertEquals(settings1, actualAlertsProfiles.get(user1));
+ assertEquals(settings2, actualAlertsProfiles.get(user2));
}
@Test
- public void saveShouldProperlySaveActiveProfile() throws Exception {
- AlertsUIUserSettings alertsUIUserSettings = new AlertsUIUserSettings();
- alertsUIUserSettings.setTableColumns(Collections.singletonList("user1_field"));
+ public void shouldSaveActiveProfile() throws Exception {
+ AlertsUIUserSettings expected = new AlertsUIUserSettings();
+ expected.setTableColumns(Collections.singletonList("user1_field"));
- alertsUIService.saveAlertsUIUserSettings(alertsUIUserSettings);
+ // save an active profile
+ alertsUIService.saveAlertsUIUserSettings(expected);
- String expectedAlertUserSettings = _mapper.get().writeValueAsString(alertsUIUserSettings);
- verify(userSettingsClient, times(1))
- .save(user1, AlertsUIServiceImpl.ALERT_USER_SETTING_TYPE, expectedAlertUserSettings);
- verifyNoMoreInteractions(userSettingsClient);
+ // get the active profile
+ Optional<AlertsUIUserSettings> actual = alertsUIService.getAlertsUIUserSettings();
+ assertEquals(expected, actual.get());
}
+
@Test
- public void deleteShouldProperlyDeleteActiveProfile() throws Exception {
- assertTrue(alertsUIService.deleteAlertsUIUserSettings(user1));
+ public void shouldDeleteActiveProfile() throws Exception {
+ AlertsUIUserSettings expected = new AlertsUIUserSettings();
+ expected.setTableColumns(Collections.singletonList("user1_field"));
- doThrow(new IOException()).when(userSettingsClient).delete(user1, AlertsUIServiceImpl.ALERT_USER_SETTING_TYPE);
- assertFalse(alertsUIService.deleteAlertsUIUserSettings(user1));
+ userSettingsClient.save(user1, ALERT_USER_SETTING_TYPE, toJSON(expected));
+ assertTrue(alertsUIService.deleteAlertsUIUserSettings(user1));
+ }
- verify(userSettingsClient, times(2)).delete(user1, AlertsUIServiceImpl.ALERT_USER_SETTING_TYPE);
- verifyNoMoreInteractions(userSettingsClient);
+ @Test
+ public void shouldNotDeleteMissingProfile() throws Exception {
+ // no profile saved for 'user999'
+ assertFalse(alertsUIService.deleteAlertsUIUserSettings("user999"));
}
/**
@@ -209,4 +210,8 @@ public class AlertsUIServiceImplTest {
map.put(key, value);
return map;
}
+
+ private String toJSON(AlertsUIUserSettings settings) throws JsonProcessingException {
+ return _mapper.get().writeValueAsString(settings);
+ }
}
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/user/HBaseUserSettingsClientTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/user/HBaseUserSettingsClientTest.java
new file mode 100644
index 0000000..c4b3ada
--- /dev/null
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/user/HBaseUserSettingsClientTest.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.user;
+
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.hbase.client.FakeHBaseClient;
+import org.apache.metron.hbase.client.FakeHBaseClientFactory;
+import org.apache.metron.hbase.client.FakeHBaseConnectionFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.metron.rest.user.HBaseUserSettingsClient.USER_SETTINGS_HBASE_CF;
+import static org.apache.metron.rest.user.HBaseUserSettingsClient.USER_SETTINGS_HBASE_TABLE;
+import static org.apache.metron.rest.user.HBaseUserSettingsClient.USER_SETTINGS_MAX_SCAN;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class HBaseUserSettingsClientTest {
+
+ @Rule
+ public final ExpectedException exception = ExpectedException.none();
+ private static final String tableName = "some_table";
+ private static final String columnFamily = "cf";
+ private static byte[] cf = Bytes.toBytes(columnFamily);
+
+ private Map<String, Object> globals;
+ private HBaseUserSettingsClient userSettingsClient;
+ private FakeHBaseClient hBaseClient;
+
+ @Before
+ public void setUp() throws Exception {
+ globals = new HashMap<String, Object>() {{
+ put(USER_SETTINGS_HBASE_TABLE, tableName);
+ put(USER_SETTINGS_HBASE_CF, columnFamily);
+ put(USER_SETTINGS_MAX_SCAN, "100000");
+ }};
+
+ hBaseClient = new FakeHBaseClient();
+ hBaseClient.deleteAll();
+
+ userSettingsClient = new HBaseUserSettingsClient(
+ () -> globals,
+ new FakeHBaseClientFactory(hBaseClient),
+ new FakeHBaseConnectionFactory(),
+ HBaseConfiguration.create());
+ userSettingsClient.init();
+
+ // timezone settings
+ userSettingsClient.save("user1", "timezone", "EST");
+ userSettingsClient.save("user2", "timezone", "MST");
+ userSettingsClient.save("user3", "timezone", "PST");
+
+ // language settings
+ userSettingsClient.save("user1", "language", "EN");
+ userSettingsClient.save("user2", "language", "ES");
+ userSettingsClient.save("user3", "language", "FR");
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ userSettingsClient.close();
+ }
+
+ @Test
+ public void shouldFindSetting() throws Exception {
+ assertEquals("EST", userSettingsClient.findOne("user1", "timezone").get());
+ assertEquals("MST", userSettingsClient.findOne("user2", "timezone").get());
+ assertEquals("PST", userSettingsClient.findOne("user3", "timezone").get());
+
+ assertEquals("EN", userSettingsClient.findOne("user1", "language").get());
+ assertEquals("ES", userSettingsClient.findOne("user2", "language").get());
+ assertEquals("FR", userSettingsClient.findOne("user3", "language").get());
+ }
+
+ @Test
+ public void shouldNotFindSetting() throws Exception {
+ assertFalse(userSettingsClient.findOne("user999", "timezone").isPresent());
+ assertFalse(userSettingsClient.findOne("user999", "language").isPresent());
+ }
+
+ @Test
+ public void shouldFindSettingsForUser() throws Exception {
+ Map<String, String> settings = userSettingsClient.findOne("user1");
+ assertEquals(2, settings.size());
+ assertEquals("EN", settings.get("language"));
+ assertEquals("EST", settings.get("timezone"));
+ }
+
+ @Test
+ public void shouldNotFindSettingsForUser() throws Exception {
+ Map<String, String> settings = userSettingsClient.findOne("user999");
+ assertEquals(0, settings.size());
+ }
+
+ @Test
+ public void shouldFindAllSettingsByType() throws Exception {
+ Map<String, Optional<String>> settings = userSettingsClient.findAll("timezone");
+
+ // there should be a 'timezone' setting defined for each user
+ int numberOfUsers = 3;
+ assertEquals(numberOfUsers, settings.size());
+ assertEquals("EST", settings.get("user1").get());
+ assertEquals("MST", settings.get("user2").get());
+ assertEquals("PST", settings.get("user3").get());
+ }
+
+ @Test
+ public void shouldFindAllSettings() throws Exception {
+ Map<String, Map<String, String>> settings = userSettingsClient.findAll();
+
+ // there should be a set of settings defined for each user
+ int numberOfUsers = 3;
+ assertEquals(numberOfUsers, settings.size());
+
+ assertEquals("EST", settings.get("user1").get("timezone"));
+ assertEquals("MST", settings.get("user2").get("timezone"));
+ assertEquals("PST", settings.get("user3").get("timezone"));
+ assertEquals("EN", settings.get("user1").get("language"));
+ assertEquals("ES", settings.get("user2").get("language"));
+ assertEquals("FR", settings.get("user3").get("language"));
+ }
+
+ @Test
+ public void shouldDeleteSetting() throws Exception {
+ // before deleting the "timezone" setting, they should all exist
+ assertEquals("EST", userSettingsClient.findOne("user1", "timezone").get());
+ assertEquals("EN", userSettingsClient.findOne("user1", "language").get());
+
+ // delete the "timezone" setting for "user1"
+ userSettingsClient.delete("user1", "timezone");
+
+ assertFalse(userSettingsClient.findOne("user1", "timezone").isPresent());
+ assertEquals("EN", userSettingsClient.findOne("user1", "language").get());
+ }
+
+ @Test
+ public void shouldNotDeleteSetting() throws Exception {
+ // delete the "timezone" setting for a user that does not exist
+ userSettingsClient.delete("user999", "timezone");
+
+ // the settings should still not exist and nothing should blow-up
+ assertFalse(userSettingsClient.findOne("user999", "timezone").isPresent());
+ assertFalse(userSettingsClient.findOne("user999", "language").isPresent());
+ }
+
+ @Test
+ public void shouldDeleteSettingsByUser() throws Exception {
+ // before deleting user1, all the settings should exist
+ assertEquals("EST", userSettingsClient.findOne("user1", "timezone").get());
+ assertEquals("EN", userSettingsClient.findOne("user1", "language").get());
+
+ // delete the user
+ userSettingsClient.delete("user1");
+
+ // none of user1's settings should exist any longer
+ assertFalse(userSettingsClient.findOne("user1", "timezone").isPresent());
+ assertFalse(userSettingsClient.findOne("user1", "language").isPresent());
+ }
+
+ @Test
+ public void shouldNotDeleteSettingsByUser() throws Exception {
+ // delete the settings for a user that does not exist
+ userSettingsClient.delete("user999");
+
+ // the settings should still not exist and nothing should blow-up
+ Map<String, String> settings = userSettingsClient.findOne("user999");
+ assertEquals(0, settings.size());
+ }
+}
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/user/UserSettingsClientTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/user/UserSettingsClientTest.java
deleted file mode 100644
index 5b4f786..0000000
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/user/UserSettingsClientTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.rest.user;
-
-
-import static org.apache.metron.rest.user.UserSettingsClient.USER_SETTINGS_HBASE_CF;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.function.Supplier;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-public class UserSettingsClientTest {
-
- @Rule
- public final ExpectedException exception = ExpectedException.none();
-
- private static ThreadLocal<ObjectMapper> _mapper = ThreadLocal.withInitial(() ->
- new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL));
-
- private HTableInterface userSettingsTable;
- private Supplier<Map<String, Object>> globalConfigSupplier;
- private UserSettingsClient userSettingsClient;
- private static byte[] cf = Bytes.toBytes("cf");
-
- @Before
- public void setUp() throws Exception {
- userSettingsTable = mock(HTableInterface.class);
- globalConfigSupplier = () -> new HashMap<String, Object>() {{
- put(USER_SETTINGS_HBASE_CF, "cf");
- }};
- }
-
- @Test
- public void shouldFindOne() throws Exception {
- Result result = mock(Result.class);
- when(result.getValue(cf, Bytes.toBytes("type"))).thenReturn("userSettings1String".getBytes());
- Get get = new Get("user1".getBytes());
- get.addFamily(cf);
- when(userSettingsTable.get(get)).thenReturn(result);
-
- UserSettingsClient userSettingsClient = new UserSettingsClient(userSettingsTable, cf);
- assertEquals("userSettings1String", userSettingsClient.findOne("user1", "type").get());
- assertFalse(userSettingsClient.findOne("missingUser", "type").isPresent());
- }
-
- @Test
- public void shouldFindAll() throws Exception {
- ResultScanner resultScanner = mock(ResultScanner.class);
- Result result1 = mock(Result.class);
- Result result2 = mock(Result.class);
- when(result1.getRow()).thenReturn("user1".getBytes());
- when(result2.getRow()).thenReturn("user2".getBytes());
- when(result1.getValue(cf, Bytes.toBytes("type"))).thenReturn("userSettings1String".getBytes());
- when(result2.getValue(cf, Bytes.toBytes("type"))).thenReturn("userSettings2String".getBytes());
- when(resultScanner.iterator()).thenReturn(Arrays.asList(result1, result2).iterator());
- when(userSettingsTable.getScanner(any(Scan.class))).thenReturn(resultScanner);
-
- UserSettingsClient userSettingsClient = new UserSettingsClient(userSettingsTable, cf);
- assertEquals(new HashMap<String, Optional<String>>() {{
- put("user1", Optional.of("userSettings1String"));
- put("user2", Optional.of("userSettings2String"));
- }}, userSettingsClient.findAll("type"));
- }
-
-}
diff --git a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
index 7226c30..9cd3867 100644
--- a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
+++ b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
@@ -119,6 +119,11 @@ public class ElasticsearchDao implements IndexDao {
}
@Override
+ public void close() throws IOException {
+ // nothing to do
+ }
+
+ @Override
public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException {
return this.searchDao.search(searchRequest);
}
diff --git a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
index ac5417e..aa064d3 100644
--- a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
+++ b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
@@ -161,6 +161,11 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
}
@Override
+ public void close() throws IOException {
+ // nothing to do
+ }
+
+ @Override
public Map<String, FieldType> getColumnMetadata(List<String> indices) throws IOException {
return indexDao.getColumnMetadata(indices);
}
diff --git a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDaoTest.java b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDaoTest.java
index cabb992..f1cd265 100644
--- a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDaoTest.java
+++ b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDaoTest.java
@@ -68,6 +68,10 @@ public class ElasticsearchMetaAlertDaoTest {
}
@Override
+ public void close() throws IOException {
+ }
+
+ @Override
public Document getLatest(String guid, String sensorType) {
return null;
}
diff --git a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
index 16760d2..97afd41 100644
--- a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
+++ b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
@@ -160,11 +160,6 @@ public class ElasticsearchUpdateIntegrationTest extends UpdateIntegrationTest {
return es.getAllIndexedDocs(index, SENSOR_NAME + "_doc");
}
- @Override
- protected MockHTable getMockHTable() {
- return table;
- }
-
/**
* Install the index template to ensure that "guid" is of type "keyword". The
* {@link org.apache.metron.elasticsearch.dao.ElasticsearchRetrieveLatestDao} cannot find
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/TableConfig.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/TableConfig.java
index de2e929..0033cb7 100644
--- a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/TableConfig.java
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/TableConfig.java
@@ -32,7 +32,6 @@ public class TableConfig implements Serializable {
private String connectorImpl;
public TableConfig() {
-
}
public TableConfig(String tableName) {
@@ -85,6 +84,7 @@ public class TableConfig implements Serializable {
public void setBatch(boolean batch) {
this.batch = batch;
}
+
/**
* @param writeBufferSize
* Overrides the client-side write buffer size.
@@ -107,12 +107,11 @@ public class TableConfig implements Serializable {
public long getWriteBufferSize() {
return writeBufferSize;
}
+
/**
* @return A Set of configured column families
*/
public Set<String> getColumnFamilies() {
return this.columnFamilies.keySet();
}
-
-
}
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/FakeHBaseClient.java b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/FakeHBaseClient.java
new file mode 100644
index 0000000..a2fce1f
--- /dev/null
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/FakeHBaseClient.java
@@ -0,0 +1,335 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.hbase.client;
+
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.hbase.ColumnList;
+import org.apache.metron.hbase.HBaseProjectionCriteria;
+import org.mockito.AdditionalMatchers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * An {@link HBaseClient} useful for testing.
+ *
+ * <p>Maintains a static, in-memory set of records to mimic the behavior of
+ * an {@link HBaseClient} that interacts with HBase.
+ */
+public class FakeHBaseClient implements HBaseClient, Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ /**
+ * The mutations that have been persisted. Represents mutations that would
+ * have been written to HBase.
+ *
+ * <p>This is static so that all instantiated clients 'see' the same set of records.
+ */
+ private static Map<ByteBuffer, ColumnList> persisted = Collections.synchronizedMap(new HashMap<>());
+
+ /**
+ * Represents a mutation that was submitted to the {@link FakeHBaseClient}.
+ */
+ public static class Mutation {
+ public byte[] rowKey;
+ public ColumnList columnList;
+
+ public Mutation(byte[] rowKey, ColumnList columnList) {
+ this.rowKey = rowKey;
+ this.columnList = columnList;
+ }
+ }
+
+ /**
+ * The set of queued or pending mutations.
+ */
+ private List<Mutation> queuedMutations;
+
+ /**
+ * The set of queued or pending gets.
+ */
+ private List<ByteBuffer> queuedGets;
+
+ public FakeHBaseClient() {
+ queuedGets = new ArrayList<>();
+ queuedMutations = new ArrayList<>();
+ }
+
+ /**
+ * Deletes all records persisted in the static, in-memory collection.
+ */
+ public void deleteAll() {
+ persisted.clear();
+ }
+
+ /**
+ * Returns all mutations that have been persisted in the static, in-memory collection.
+ */
+ public List<Mutation> getAllPersisted() {
+ return persisted.entrySet()
+ .stream()
+ .map(entry -> new Mutation(entry.getKey().array(), entry.getValue()))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public void addGet(byte[] rowKey, HBaseProjectionCriteria criteria) {
+ queuedGets.add(ByteBuffer.wrap(rowKey));
+ }
+
+ @Override
+ public Result[] getAll() {
+ LOG.debug("Looking for {} get(s) amongst {} persisted record(s)", queuedGets.size(), persisted.size());
+ List<Result> results = new ArrayList<>();
+ for (int i = 0; i < queuedGets.size(); i++) {
+ ByteBuffer rowKey = queuedGets.get(i);
+ Result result;
+ if (persisted.containsKey(rowKey)) {
+ ColumnList cols = persisted.get(rowKey);
+ result = matchingResult(rowKey.array(), cols);
+
+ } else {
+ result = emptyResult();
+ }
+ results.add(result);
+ }
+
+ clearGets();
+ return results.stream().toArray(Result[]::new);
+ }
+
+ /**
+ * Builds a mock {@link Result} that will respond correctly to the following methods calls.
+ *
+ * Result.containsColumn(family, qualifier)
+ * Result.getValue(family, qualifier)
+ * Result.getFamilyMap(family)
+ *
+ * @param columns The columns.
+ * @return A {@link Result}.
+ */
+ private static Result matchingResult(byte[] rowKey, ColumnList columns) {
+ // Result.getRow() should return the row key
+ Result result = mock(Result.class);
+ when(result.getRow())
+ .thenReturn(rowKey);
+
+ // find all column families
+ Set<ByteBuffer> families = new HashSet<>();
+ for (ColumnList.Column column : columns.getColumns()) {
+ families.add(ByteBuffer.wrap(column.getFamily()));
+ }
+
+ // for each column family
+ for (ByteBuffer family: families) {
+
+ // build the family map
+ NavigableMap<byte[], byte[]> familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ for (ColumnList.Column column : columns.getColumns()) {
+
+ // is this column in the current family?
+ if (family.equals(ByteBuffer.wrap(column.getFamily()))) {
+ LOG.debug("Found column in family; {}:{}", Bytes.toString(column.getFamily()),
+ Bytes.toString(column.getQualifier()));
+
+ // Result.containsColumn(family, qualifier) should return true
+ when(result.containsColumn(eq(column.getFamily()), eq(column.getQualifier())))
+ .thenReturn(true);
+
+ // Result.getValue(family, qualifier) should return the value
+ when(result.getValue(eq(column.getFamily()), eq(column.getQualifier())))
+ .thenReturn(column.getValue());
+
+ familyMap.put(column.getQualifier(), column.getValue());
+ }
+ }
+ LOG.debug("Built family map; family={}, size={}", Bytes.toString(family.array()), familyMap.size());
+
+ // Result.getFamilyMap(family) should return all values in that family
+ when(result.getFamilyMap(AdditionalMatchers.aryEq(family.array())))
+ .thenReturn(familyMap);
+ }
+
+ return result;
+ }
+
+ private static Result emptyResult() {
+ Result result = mock(Result.class);
+ when(result.containsColumn(any(), any()))
+ .thenReturn(false);
+ when(result.isEmpty())
+ .thenReturn(true);
+ return result;
+ }
+
+ @Override
+ public void clearGets() {
+ queuedGets.clear();
+ }
+
+ @Override
+ public List<String> scanRowKeys() {
+ return persisted
+ .keySet()
+ .stream()
+ .map(buffer -> Bytes.toString(buffer.array()))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public Result[] scan(int numRows) throws IOException {
+ return persisted
+ .entrySet()
+ .stream()
+ .limit(numRows)
+ .map((entry) -> matchingResult(entry.getKey().array(), entry.getValue()))
+ .toArray(Result[]::new);
+ }
+
+ @Override
+ public void addMutation(byte[] rowKey, ColumnList cols) {
+ for (ColumnList.Column column : cols.getColumns()) {
+ String family = Bytes.toString(column.getFamily());
+ String qualifier = Bytes.toString(column.getQualifier());
+ LOG.debug("Queuing mutation column; {}:{}", family, qualifier);
+ }
+ queuedMutations.add(new Mutation(rowKey, cols));
+ }
+
+ @Override
+ public void addMutation(byte[] rowKey, ColumnList cols, Durability durability) {
+ // ignore durability
+ addMutation(rowKey, cols);
+ }
+
+ @Override
+ public void addMutation(byte[] rowKey, ColumnList cols, Durability durability, Long timeToLiveMillis) {
+ // ignore durability and time-to-live
+ addMutation(rowKey, cols);
+ }
+
+ @Override
+ public int mutate() {
+ int numberOfMutations = queuedMutations.size();
+
+ // persist each queued mutation
+ for(Mutation mutation: queuedMutations) {
+ ByteBuffer key = ByteBuffer.wrap(mutation.rowKey);
+ ColumnList columns = mutation.columnList;
+
+ // if there are existing columns, need to merge the columns
+ ColumnList existing = persisted.get(key);
+ if(existing != null) {
+ for(ColumnList.Column col: existing.getColumns()) {
+ columns.addColumn(col);
+ }
+ }
+
+ persisted.put(key, columns);
+ }
+ clearMutations();
+
+ LOG.debug("Wrote {} record(s); now have {} record(s) in all", numberOfMutations, persisted.size());
+ return numberOfMutations;
+ }
+
+ @Override
+ public void clearMutations() {
+ queuedMutations.clear();
+ }
+
+ @Override
+ public void delete(byte[] rowKey) {
+ persisted.remove(ByteBuffer.wrap(rowKey));
+ }
+
+ @Override
+ public void delete(byte[] rowKey, ColumnList toDelete) {
+ ColumnList existingColumns = persisted.get(ByteBuffer.wrap(rowKey));
+ if(existingColumns != null) {
+ // the names of columns that need deleted
+ List<String> columnsToDelete = nameOf(toDelete);
+ LOG.debug("About to delete columns; existing={}, toDelete={}", nameOf(existingColumns), columnsToDelete);
+
+ // build a new set of columns that removes any columns that need deleted
+ ColumnList newColumns = new ColumnList();
+ for(ColumnList.Column existing: existingColumns.getColumns()) {
+
+ // only keep the columns that do not need to be deleted
+ boolean keepColumn = !columnsToDelete.contains(nameOf(existing));
+ if(keepColumn) {
+ newColumns.addColumn(existing);
+ } else {
+ LOG.debug("Column was deleted; column={}", existing);
+ }
+ }
+
+ if(newColumns.hasColumns()) {
+ // persist the new columns
+ persisted.put(ByteBuffer.wrap(rowKey), newColumns);
+
+ } else {
+ // there are no columns left, so remove the row
+ persisted.remove(ByteBuffer.wrap(rowKey));
+ }
+
+ } else {
+ LOG.debug("Nothing to delete");
+ }
+ }
+
+ private List<String> nameOf(ColumnList columnList) {
+ return columnList
+ .getColumns()
+ .stream()
+ .map(col -> nameOf(col))
+ .collect(Collectors.toList());
+ }
+
+ private String nameOf(ColumnList.Column column) {
+ return Bytes.toString(column.getFamily()) + ":" + Bytes.toString(column.getQualifier());
+ }
+
+ @Override
+ public void close() {
+ // nothing to do
+ }
+}
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/FakeHBaseClientFactory.java b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/FakeHBaseClientFactory.java
new file mode 100644
index 0000000..135e693
--- /dev/null
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/FakeHBaseClientFactory.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Creates a {@link FakeHBaseClient}.
+ */
+public class FakeHBaseClientFactory implements HBaseClientFactory {
+
+ private FakeHBaseClient hBaseClient;
+
+ public FakeHBaseClientFactory() {
+ this(new FakeHBaseClient());
+ }
+
+ public FakeHBaseClientFactory(FakeHBaseClient hBaseClient) {
+ this.hBaseClient = hBaseClient;
+ }
+
+ @Override
+ public HBaseClient create(HBaseConnectionFactory factory, Configuration configuration, String tableName) {
+ return hBaseClient;
+ }
+}
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/FakeHBaseClientTest.java b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/FakeHBaseClientTest.java
new file mode 100644
index 0000000..9dca390
--- /dev/null
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/FakeHBaseClientTest.java
@@ -0,0 +1,268 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.hbase.client;
+
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.hbase.ColumnList;
+import org.apache.metron.hbase.HBaseProjectionCriteria;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Tests the {@link FakeHBaseClient} class.
+ */
+public class FakeHBaseClientTest {
+ private static final String columnFamily = "W";
+ private static final byte[] columnFamilyB = Bytes.toBytes(columnFamily);
+ private static final String columnQualifier = "column";
+ private static final byte[] columnQualifierB = Bytes.toBytes(columnQualifier);
+ private static final String rowKey1String = "row-key-1";
+ private static final byte[] rowKey1 = Bytes.toBytes(rowKey1String);
+ private static final String rowKey2String = "row-key-2";
+ private static final byte[] rowKey2 = Bytes.toBytes(rowKey2String);
+
+ private FakeHBaseClient client;
+
+ @Before
+ public void setup() {
+ client = new FakeHBaseClient();
+ client.deleteAll();
+ }
+
+ @Test
+ public void testMutate() throws Exception {
+ // write some values
+ ColumnList columns = new ColumnList()
+ .addColumn(columnFamily, columnQualifier, "value1");
+ client.addMutation(rowKey1, columns, Durability.SKIP_WAL);
+ client.mutate();
+
+ // read back the value
+ client.addGet(rowKey1, new HBaseProjectionCriteria().addColumnFamily(columnFamily));
+ Result[] results = client.getAll();
+
+ // validate
+ assertEquals(1, results.length);
+ assertEquals("value1", getValue(results[0], columnFamily, columnQualifier));
+ }
+
+ @Test
+ public void testMutateMultipleColumns() throws Exception {
+ // write some values
+ ColumnList columns = new ColumnList()
+ .addColumn(columnFamily, "col1", "value1")
+ .addColumn(columnFamily, "col2", "value2");
+ client.addMutation(rowKey1, columns, Durability.SKIP_WAL);
+ client.mutate();
+
+ // read back the value
+ client.addGet(rowKey1, new HBaseProjectionCriteria().addColumnFamily(columnFamily));
+ Result[] results = client.getAll();
+
+ // validate
+ assertEquals(1, results.length);
+ assertEquals("value1", getValue(results[0], columnFamily, "col1"));
+ assertEquals("value2", getValue(results[0], columnFamily, "col2"));
+ }
+
+ @Test
+ public void testNoMutations() throws Exception {
+ // do not add any mutations before attempting to write
+ int count = client.mutate();
+ Assert.assertEquals(0, count);
+
+ // attempt to read
+ HBaseProjectionCriteria criteria = new HBaseProjectionCriteria().addColumnFamily(columnFamily);
+ client.addGet(rowKey1, criteria);
+ client.addGet(rowKey2, criteria);
+ Result[] results = client.getAll();
+
+ // nothing should have been read
+ assertEquals(2, results.length);
+ for(Result result : results) {
+ Assert.assertTrue(result.isEmpty());
+ }
+ }
+
+ /**
+ * Unfortunately, the {@link Result} returned by the {@link FakeHBaseClient} is a mock and needs
+ * to respond like an actual {@link Result}. This test ensures that {@link Result#getFamilyMap(byte[])}
+ * works correctly.
+ */
+ @Test
+ public void testResultFamilyMap() {
+ // write some values
+ ColumnList columns = new ColumnList()
+ .addColumn(columnFamily, "col1", "value1")
+ .addColumn(columnFamily, "col2", "value2");
+ client.addMutation(rowKey1, columns, Durability.SKIP_WAL);
+ client.mutate();
+
+ // read back the value
+ HBaseProjectionCriteria criteria = new HBaseProjectionCriteria()
+ .addColumnFamily(columnFamily);
+ client.addGet(rowKey1, criteria);
+ Result[] results = client.getAll();
+
+ // validate
+ assertEquals(1, results.length);
+ Result result = results[0];
+
+ // the first column
+ String value1 = Bytes.toString(result.getFamilyMap(columnFamilyB).get(Bytes.toBytes("col1")));
+ assertEquals("value1", value1);
+
+ // the second column
+ String value2 = Bytes.toString(result.getFamilyMap(columnFamilyB).get(Bytes.toBytes("col2")));
+ assertEquals("value2", value2);
+ }
+
+ @Test
+ public void testScan() throws Exception {
+ // write some values
+ client.addMutation(rowKey1, new ColumnList().addColumn(columnFamily, columnQualifier, "value1"), Durability.SKIP_WAL);
+ client.mutate();
+
+ // scan the table
+ Result[] results = client.scan(10);
+ assertEquals(1, results.length);
+
+ assertArrayEquals(rowKey1, results[0].getRow());
+ String actual1 = Bytes.toString(results[0].getValue(columnFamilyB, columnQualifierB));
+ assertEquals("value1", actual1);
+ }
+
+ @Test
+ public void testScanLimit() throws Exception {
+ // write some values
+ client.addMutation(rowKey1, new ColumnList().addColumn(columnFamily, columnQualifier, "value1"), Durability.SKIP_WAL);
+ client.addMutation(rowKey2, new ColumnList().addColumn(columnFamily, columnQualifier, "value2"), Durability.SKIP_WAL);
+ client.mutate();
+
+ // scan the table, but limit to 1 result
+ Result[] results = client.scan(1);
+ assertEquals(1, results.length);
+ }
+
+ @Test
+ public void testScanNothing() throws Exception {
+ // scan the table, but there is nothing there
+ Result[] results = client.scan(1);
+ assertEquals(0, results.length);
+ }
+
+ @Test
+ public void testScanRowKeys() throws Exception {
+ // write some values
+ client.addMutation(rowKey1, new ColumnList().addColumn(columnFamily, columnQualifier, "value1"), Durability.SKIP_WAL);
+ client.addMutation(rowKey2, new ColumnList().addColumn(columnFamily, columnQualifier, "value2"), Durability.SKIP_WAL);
+ client.mutate();
+
+ // scan the table
+ List<String> rowKeys = client.scanRowKeys();
+ List<String> expected = Arrays.asList(rowKey1String, rowKey2String);
+ assertEquals(new HashSet<>(expected), new HashSet<>(rowKeys));
+ }
+
+ @Test
+ public void testDelete() {
+ // write some values
+ client.addMutation(rowKey1, new ColumnList().addColumn(columnFamily, columnQualifier, "value1"), Durability.SKIP_WAL);
+ client.addMutation(rowKey2, new ColumnList().addColumn(columnFamily, columnQualifier, "value2"), Durability.SKIP_WAL);
+ client.mutate();
+
+ client.delete(rowKey1);
+
+ // the deleted row key should no longer exist
+ client.addGet(rowKey1, new HBaseProjectionCriteria().addColumnFamily(columnFamily));
+ Assert.assertTrue(client.getAll()[0].isEmpty());
+
+ // the other row key should remain
+ client.addGet(rowKey2, new HBaseProjectionCriteria().addColumnFamily(columnFamily));
+ Assert.assertFalse(client.getAll()[0].isEmpty());
+ }
+
+ @Test
+ public void testDeleteNothing() {
+ // nothing should blow-up if we attempt to delete something that does not exist
+ client.delete(rowKey1);
+ }
+
+ @Test
+ public void testDeleteColumn() {
+ // write some values
+ ColumnList columns = new ColumnList()
+ .addColumn(columnFamily, "col1", "value1")
+ .addColumn(columnFamily, "col2", "value2");
+ client.addMutation(rowKey1, columns, Durability.SKIP_WAL);
+ client.mutate();
+
+ // delete a column
+ client.delete(rowKey1, new ColumnList().addColumn(columnFamily, "col1"));
+
+ // read back the value
+ client.addGet(rowKey1, new HBaseProjectionCriteria().addColumnFamily(columnFamily));
+ Result[] results = client.getAll();
+
+ // validate
+ assertEquals(1, results.length);
+ assertNull(getValue(results[0], columnFamily, "col1"));
+ assertEquals("value2", getValue(results[0], columnFamily, "col2"));
+ }
+
+ @Test
+ public void testDeleteAllColumns() {
+ // write some values
+ ColumnList columns = new ColumnList()
+ .addColumn(columnFamily, "col1", "value1")
+ .addColumn(columnFamily, "col2", "value2");
+ client.addMutation(rowKey1, columns, Durability.SKIP_WAL);
+ client.mutate();
+
+ // delete both columns individually
+ client.delete(rowKey1, new ColumnList().addColumn(columnFamily, "col1"));
+ client.delete(rowKey1, new ColumnList().addColumn(columnFamily, "col2"));
+
+ // read back the value
+ client.addGet(rowKey1, new HBaseProjectionCriteria().addColumnFamily(columnFamily));
+ Result[] results = client.getAll();
+
+ // validate
+ assertEquals(1, results.length);
+ assertNull(getValue(results[0], columnFamily, "col1"));
+ assertNull(getValue(results[0], columnFamily, "col2"));
+ }
+
+ private String getValue(Result result, String columnFamily, String columnQualifier) {
+ byte[] value = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(columnQualifier));
+ return Bytes.toString(value);
+ }
+}
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/FakeHBaseConnectionFactory.java b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/FakeHBaseConnectionFactory.java
new file mode 100644
index 0000000..d44e110
--- /dev/null
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/FakeHBaseConnectionFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Table;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * A mock {@link HBaseConnectionFactory} useful for testing.
+ */
+public class FakeHBaseConnectionFactory extends HBaseConnectionFactory {
+
+ /**
+ * A set of {@link Table}s that will be returned by all {@link Connection}
+ * objects created by this factory.
+ */
+ private Map<TableName, Table> tables;
+
+ public FakeHBaseConnectionFactory() {
+ this.tables = new HashMap<>();
+ }
+
+ /**
+ * The {@link Connection} returned by this factory will return the given table by
+ * name when {@link Connection#getTable(TableName)} is called.
+ *
+ * @param tableName The name of the table.
+ * @param table The table.
+ * @return
+ */
+ public FakeHBaseConnectionFactory withTable(String tableName, Table table) {
+ this.tables.put(TableName.valueOf(tableName), table);
+ return this;
+ }
+
+ /**
+ * The {@link Connection} returned by this factory will return a table by
+ * name when {@link Connection#getTable(TableName)} is called.
+ *
+ * @param tableName The name of the table.
+ * @return
+ */
+ public FakeHBaseConnectionFactory withTable(String tableName) {
+ return withTable(tableName, mock(Table.class));
+ }
+
+ public Table getTable(String tableName) {
+ return tables.get(TableName.valueOf(tableName));
+ }
+
+ @Override
+ public Connection createConnection(Configuration configuration) throws IOException {
+ Connection connection = mock(Connection.class);
+
+ // the connection must return each of the given tables by name
+ for(Map.Entry<TableName, Table> entry: tables.entrySet()) {
+ TableName tableName = entry.getKey();
+ Table table = entry.getValue();
+ when(connection.getTable(eq(tableName))).thenReturn(table);
+ }
+
+ return connection;
+ }
+}
diff --git a/metron-platform/metron-indexing/metron-indexing-common/pom.xml b/metron-platform/metron-indexing/metron-indexing-common/pom.xml
index df81532..3f8e812 100644
--- a/metron-platform/metron-indexing/metron-indexing-common/pom.xml
+++ b/metron-platform/metron-indexing/metron-indexing-common/pom.xml
@@ -175,6 +175,34 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-testing-util</artifactId>
+ <version>${global_hbase_version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/metron-platform/metron-indexing/metron-indexing-common/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java b/metron-platform/metron-indexing/metron-indexing-common/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java
index b1df46a..9ebecb6 100644
--- a/metron-platform/metron-indexing/metron-indexing-common/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java
+++ b/metron-platform/metron-indexing/metron-indexing-common/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java
@@ -17,8 +17,11 @@
*/
package org.apache.metron.indexing.dao;
-import java.util.function.Function;
+import org.apache.hadoop.conf.Configuration;
import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.hbase.client.HBaseConnectionFactory;
+
+import java.util.function.Function;
import java.util.HashMap;
import java.util.Map;
@@ -30,9 +33,12 @@ public class AccessConfig {
private Supplier<Map<String, Object>> globalConfigSupplier;
private Function<String, String> indexSupplier;
private Map<String, String> optionalSettings = new HashMap<>();
+ private HBaseConnectionFactory hbaseConnectionFactory;
+ private Configuration hbaseConfiguration;
private TableProvider tableProvider = null;
private Boolean isKerberosEnabled = false;
+
/**
* @return A supplier which will return the current global config.
*/
@@ -86,6 +92,30 @@ public class AccessConfig {
}
/**
+ * @return The {@link HBaseConnectionFactory} that establishes connections to HBase.
+ */
+ public HBaseConnectionFactory getHbaseConnectionFactory() {
+ return hbaseConnectionFactory;
+ }
+
+ public AccessConfig setHbaseConnectionFactory(HBaseConnectionFactory hbaseConnectionFactory) {
+ this.hbaseConnectionFactory = hbaseConnectionFactory;
+ return this;
+ }
+
+ /**
+ * @return The configuration used to connect to HBase.
+ */
+ public Configuration getHbaseConfiguration() {
+ return hbaseConfiguration;
+ }
+
+ public AccessConfig setHbaseConfiguration(Configuration hbaseConfiguration) {
+ this.hbaseConfiguration = hbaseConfiguration;
+ return this;
+ }
+
+ /**
* @return The table provider to use for NoSql DAOs
*/
public TableProvider getTableProvider() {
diff --git a/metron-platform/metron-indexing/metron-indexing-common/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java b/metron-platform/metron-indexing/metron-indexing-common/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java
index 71d0544..0256307 100644
--- a/metron-platform/metron-indexing/metron-indexing-common/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java
+++ b/metron-platform/metron-indexing/metron-indexing-common/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java
@@ -18,25 +18,18 @@
package org.apache.metron.indexing.dao;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Optional;
-import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.metron.common.utils.JSONUtils;
import org.apache.metron.common.utils.KeyUtil;
+import org.apache.metron.hbase.client.HBaseConnectionFactory;
import org.apache.metron.indexing.dao.search.AlertComment;
import org.apache.metron.indexing.dao.search.FieldType;
import org.apache.metron.indexing.dao.search.GetRequest;
@@ -48,6 +41,18 @@ import org.apache.metron.indexing.dao.search.SearchResponse;
import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
import org.apache.metron.indexing.dao.update.Document;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
/**
* The HBaseDao is an index dao which only supports the following actions:
* * Update
@@ -60,11 +65,12 @@ import org.apache.metron.indexing.dao.update.Document;
*
*/
public class HBaseDao implements IndexDao {
- public static String HBASE_TABLE = "update.hbase.table";
- public static String HBASE_CF = "update.hbase.cf";
- private HTableInterface tableInterface;
- private byte[] cf;
- private AccessConfig config;
+ public static final String HBASE_TABLE = "update.hbase.table";
+ public static final String HBASE_CF = "update.hbase.cf";
+ private Table table;
+ private byte[] columnFamily;
+ private AccessConfig accessConfig;
+ private Connection connection;
/**
* Implements the HBaseDao row key and exposes convenience methods for serializing/deserializing the row key.
@@ -73,6 +79,7 @@ public class HBaseDao implements IndexDao {
public static class Key {
private String guid;
private String sensorType;
+
public Key(String guid, String sensorType) {
this.guid = guid;
this.sensorType = sensorType;
@@ -132,7 +139,6 @@ public class HBaseDao implements IndexDao {
}
public HBaseDao() {
-
}
@Override
@@ -146,40 +152,72 @@ public class HBaseDao implements IndexDao {
}
@Override
- public synchronized void init(AccessConfig config) {
- if(this.tableInterface == null) {
- this.config = config;
- Map<String, Object> globalConfig = config.getGlobalConfigSupplier().get();
- if(globalConfig == null) {
- throw new IllegalStateException("Cannot find the global config.");
- }
- String table = (String)globalConfig.get(HBASE_TABLE);
- String cf = (String) config.getGlobalConfigSupplier().get().get(HBASE_CF);
- if(table == null || cf == null) {
- throw new IllegalStateException("You must configure " + HBASE_TABLE + " and " + HBASE_CF + " in the global config.");
- }
+ public synchronized void init(AccessConfig accessConfig) {
+ if(table == null) {
+ this.accessConfig = accessConfig;
+ Map<String, Object> globals = getGlobals(accessConfig);
+ String tableName = getTableName(globals);
+ columnFamily = Bytes.toBytes(getColumnFamily(globals));
+
try {
- tableInterface = config.getTableProvider().getTable(HBaseConfiguration.create(), table);
- this.cf = cf.getBytes();
+ HBaseConnectionFactory connectionFactory = accessConfig.getHbaseConnectionFactory();
+ Configuration conf = accessConfig.getHbaseConfiguration();
+ connection = connectionFactory.createConnection(conf);
+ table = connection.getTable(TableName.valueOf(tableName));
+
} catch (IOException e) {
throw new IllegalStateException("Unable to initialize HBaseDao: " + e.getMessage(), e);
}
}
}
- public HTableInterface getTableInterface() {
- if(tableInterface == null) {
- init(config);
+ @Override
+ public void close() throws IOException {
+ if(table != null) {
+ table.close();
+ }
+ if(connection != null) {
+ connection.close();
+ }
+ }
+
+ public Table getTable() {
+ if(table == null) {
+ init(accessConfig);
+ }
+ return table;
+ }
+
+ private Map<String, Object> getGlobals(AccessConfig accessConfig) {
+ Map<String, Object> globalConfig = accessConfig.getGlobalConfigSupplier().get();
+ if(globalConfig == null) {
+ throw new IllegalStateException("Cannot find the global config.");
+ }
+ return globalConfig;
+ }
+
+ private static String getTableName(Map<String, Object> globalConfig) {
+ String table = (String) globalConfig.get(HBASE_TABLE);
+ if(table == null) {
+ throw new IllegalStateException("You must configure " + HBASE_TABLE + "in the global config.");
+ }
+ return table;
+ }
+
+ private static String getColumnFamily(Map<String, Object> globalConfig) {
+ String cf = (String) globalConfig.get(HBASE_CF);
+ if(cf == null) {
+ throw new IllegalStateException("You must configure " + HBASE_CF + " in the global config.");
}
- return tableInterface;
+ return cf;
}
@Override
public synchronized Document getLatest(String guid, String sensorType) throws IOException {
Key k = new Key(guid, sensorType);
Get get = new Get(Key.toBytes(k));
- get.addFamily(cf);
- Result result = getTableInterface().get(get);
+ get.addFamily(columnFamily);
+ Result result = getTable().get(get);
return getDocumentFromResult(result);
}
@@ -190,7 +228,7 @@ public class HBaseDao implements IndexDao {
for (GetRequest getRequest: getRequests) {
gets.add(buildGet(getRequest));
}
- Result[] results = getTableInterface().get(gets);
+ Result[] results = getTable().get(gets);
List<Document> allLatest = new ArrayList<>();
for (Result result: results) {
Document d = getDocumentFromResult(result);
@@ -202,7 +240,7 @@ public class HBaseDao implements IndexDao {
}
private Document getDocumentFromResult(Result result) throws IOException {
- NavigableMap<byte[], byte[]> columns = result.getFamilyMap( cf);
+ NavigableMap<byte[], byte[]> columns = result.getFamilyMap(columnFamily);
if(columns == null || columns.size() == 0) {
return null;
}
@@ -240,7 +278,7 @@ public class HBaseDao implements IndexDao {
@Override
public synchronized Document update(Document update, Optional<String> index) throws IOException {
Put put = buildPut(update);
- getTableInterface().put(put);
+ getTable().put(put);
return update;
}
@@ -253,14 +291,14 @@ public class HBaseDao implements IndexDao {
Put put = buildPut(update);
puts.add(put);
}
- getTableInterface().put(puts);
+ getTable().put(puts);
return updates;
}
protected Get buildGet(GetRequest getRequest) throws IOException {
Key k = new Key(getRequest.getGuid(), getRequest.getSensorType());
Get get = new Get(Key.toBytes(k));
- get.addFamily(cf);
+ get.addFamily(columnFamily);
return get;
}
@@ -270,7 +308,7 @@ public class HBaseDao implements IndexDao {
long ts = update.getTimestamp() == null || update.getTimestamp() == 0 ? System.currentTimeMillis() : update.getTimestamp();
byte[] columnQualifier = Bytes.toBytes(ts);
byte[] doc = JSONUtils.INSTANCE.toJSONPretty(update.getDocument());
- put.addColumn(cf, columnQualifier, doc);
+ put.addColumn(columnFamily, columnQualifier, doc);
return put;
}
diff --git a/metron-platform/metron-indexing/metron-indexing-common/src/main/java/org/apache/metron/indexing/dao/IndexDao.java b/metron-platform/metron-indexing/metron-indexing-common/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
index 11b2ff0..79b9101 100644
--- a/metron-platform/metron-indexing/metron-indexing-common/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
+++ b/metron-platform/metron-indexing/metron-indexing-common/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
@@ -20,11 +20,13 @@ package org.apache.metron.indexing.dao;
import org.apache.metron.indexing.dao.search.SearchDao;
import org.apache.metron.indexing.dao.update.UpdateDao;
+import java.io.Closeable;
+
/**
* The IndexDao provides a common interface for retrieving and storing data in a variety of persistent stores.
* Document reads and writes require a GUID and sensor type with an index being optional.
*/
-public interface IndexDao extends UpdateDao, SearchDao, RetrieveLatestDao, ColumnMetadataDao {
+public interface IndexDao extends UpdateDao, SearchDao, RetrieveLatestDao, ColumnMetadataDao, Closeable {
String COMMENTS_FIELD = "comments";
diff --git a/metron-platform/metron-indexing/metron-indexing-common/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java b/metron-platform/metron-indexing/metron-indexing-common/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
index a240c56..d762605 100644
--- a/metron-platform/metron-indexing/metron-indexing-common/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
+++ b/metron-platform/metron-indexing/metron-indexing-common/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
@@ -256,6 +256,13 @@ public class MultiIndexDao implements IndexDao {
}
@Override
+ public void close() throws IOException {
+ for(IndexDao dao : indices) {
+ dao.close();
+ }
+ }
+
+ @Override
public Document getLatest(final String guid, String sensorType) throws IOException {
List<DocumentContainer> output = indices
.parallelStream()
diff --git a/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java b/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
index f49a6ad..8425798 100644
--- a/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
+++ b/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
@@ -215,6 +215,11 @@ public class InMemoryDao implements IndexDao {
}
@Override
+ public void close() {
+ // nothing to do
+ }
+
+ @Override
public Document getLatest(String guid, String sensorType) throws IOException {
for(Map.Entry<String, List<String>> kv: BACKING_STORE.entrySet()) {
if(kv.getKey().startsWith(sensorType)) {
diff --git a/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/InMemoryMetaAlertDao.java b/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/InMemoryMetaAlertDao.java
index 5dbefdd..6b683da 100644
--- a/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/InMemoryMetaAlertDao.java
+++ b/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/InMemoryMetaAlertDao.java
@@ -115,6 +115,12 @@ public class InMemoryMetaAlertDao implements MetaAlertDao {
// Ignore threatSort for test.
}
+ @Override
+ public void close() throws IOException {
+ if(indexDao != null) {
+ indexDao.close();
+ }
+ }
@Override
public Document getLatest(String guid, String sensorType) throws IOException {
diff --git a/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/UpdateIntegrationTest.java b/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/UpdateIntegrationTest.java
index 6333d32..f9e6be6 100644
--- a/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/UpdateIntegrationTest.java
+++ b/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/UpdateIntegrationTest.java
@@ -17,12 +17,12 @@ package org.apache.metron.indexing.dao;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.commons.collections.MapUtils;
import org.apache.metron.common.Constants;
-import org.apache.metron.hbase.mock.MockHTable;
import org.apache.metron.indexing.dao.search.AlertComment;
import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
import org.apache.metron.indexing.dao.update.Document;
import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
import org.apache.metron.indexing.dao.update.PatchRequest;
+import org.json.simple.parser.ParseException;
import org.junit.Assert;
import org.junit.Test;
@@ -35,9 +35,10 @@ import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
-import static org.apache.metron.indexing.dao.IndexDao.COMMENTS_FIELD;
import static org.hamcrest.CoreMatchers.hasItem;
+import static org.apache.metron.indexing.dao.IndexDao.COMMENTS_FIELD;
+
public abstract class UpdateIntegrationTest {
/**
@@ -272,29 +273,81 @@ public abstract class UpdateIntegrationTest {
return new Document(message1, guid, SENSOR_NAME, timestamp);
}
- private List<AlertComment> getComments(Document withComment) {
- List<Map<String, Object>> commentsField = List.class.cast(withComment.getDocument().get(COMMENTS_FIELD));
+ private static List<AlertComment> getComments(Document withComment) throws ParseException {
+ return getComments(withComment.getDocument());
+ }
+
+ private static List<AlertComment> getComments(Map<String, Object> fields) throws ParseException {
List<AlertComment> comments = new ArrayList<>();
- if(commentsField != null) {
- comments = commentsField
- .stream()
- .map(map -> new AlertComment(map))
- .collect(Collectors.toList());
+ boolean hasComments = fields.containsKey(COMMENTS_FIELD);
+ if(hasComments) {
+ List<Object> commentsField = List.class.cast(fields.get(COMMENTS_FIELD));
+ for (Object commentObject: commentsField) {
+ if (commentObject instanceof Map) {
+ // comments are stored as maps in Elasticsearch
+ Map<String, Object> commentAsMap = (Map<String, Object>) commentObject;
+ comments.add(new AlertComment(commentAsMap));
+
+ } else if (commentObject instanceof String) {
+ // comments are stored as json strings in Solr
+ String commentAsString = (String) commentObject;
+ comments.add(new AlertComment(commentAsString));
+
+ } else {
+ throw new IllegalArgumentException(String.format("Unexpected comment value; %s", commentObject));
+ }
+ }
}
return comments;
}
- protected Document findUpdatedDoc(Map<String, Object> message0, String guid, String sensorType)
+ /**
+ * Reformats the format of stored comments.
+ *
+ * <p>Comments are serialized differently when stored in Elasticsearch and Solr. Comments
+ * are stored as maps in Elasticsearch and JSON strings in Solr. This reformats all comments
+ * as maps, so they look the same when validation occurs in the integration tests.
+ * @param fields
+ */
+ protected static void reformatComments(Map<String, Object> fields) {
+ @SuppressWarnings("unchecked")
+ List<Object> commentValues = (List<Object>) fields.get(COMMENTS_FIELD);
+ if (commentValues != null) {
+ try {
+ List<AlertComment> comments = getComments(fields);
+ if(comments.size() > 0) {
+ // overwrite the comments field
+ List<Map<String, Object>> serializedComments = comments
+ .stream()
+ .map(AlertComment::asMap)
+ .collect(Collectors.toList());
+ fields.put(COMMENTS_FIELD, serializedComments);
+
+ } else {
+ // there are no longer any comments
+ fields.remove(COMMENTS_FIELD);
+ }
+
+ } catch (ParseException e) {
+ throw new IllegalStateException("Unable to parse comment", e);
+ }
+ }
+ }
+
+ protected Document findUpdatedDoc(Map<String, Object> expected, String guid, String sensorType)
throws InterruptedException, IOException, OriginalNotFoundException {
+ // comments are stored differently in Solr and Elasticsearch
+ reformatComments(expected);
+
for (int t = 0; t < MAX_RETRIES; ++t, Thread.sleep(SLEEP_MS)) {
- Document doc = getDao().getLatest(guid, sensorType);
- if (doc != null && message0.equals(doc.getDocument())) {
- return doc;
+ Document found = getDao().getLatest(guid, sensorType);
+ if (found != null && expected.equals(found.getDocument())) {
+ return found;
}
if (t == MAX_RETRIES -1) {
- MapUtils.debugPrint(System.out, "Expected", message0);
- MapUtils.debugPrint(System.out, "actual", doc.getDocument());
+ MapUtils.debugPrint(System.out, "Expected", expected);
+ MapUtils.debugPrint(System.out, "Actual", found.getDocument());
}
}
throw new OriginalNotFoundException("Count not find " + guid + " after " + MAX_RETRIES + " tries");
@@ -309,7 +362,6 @@ public abstract class UpdateIntegrationTest {
}
protected abstract String getIndexName();
- protected abstract MockHTable getMockHTable();
protected abstract void addTestData(String indexName, String sensorType, List<Map<String,Object>> docs) throws Exception;
protected abstract List<Map<String,Object>> getIndexedTestData(String indexName, String sensorType) throws Exception;
}
diff --git a/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/integration/HBaseDaoIntegrationTest.java b/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/integration/HBaseDaoIntegrationTest.java
index 8148b69..d1d0707 100644
--- a/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/integration/HBaseDaoIntegrationTest.java
+++ b/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/integration/HBaseDaoIntegrationTest.java
@@ -18,20 +18,15 @@
package org.apache.metron.indexing.integration;
-import static org.apache.metron.indexing.dao.HBaseDao.HBASE_CF;
-import static org.apache.metron.indexing.dao.HBaseDao.HBASE_TABLE;
-import static org.apache.metron.indexing.dao.IndexDao.COMMENTS_FIELD;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-import org.apache.metron.hbase.mock.MockHBaseTableProvider;
-import org.apache.metron.hbase.mock.MockHTable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.metron.hbase.client.HBaseConnectionFactory;
import org.apache.metron.indexing.dao.AccessConfig;
import org.apache.metron.indexing.dao.HBaseDao;
import org.apache.metron.indexing.dao.IndexDao;
@@ -40,18 +35,34 @@ import org.apache.metron.indexing.dao.search.AlertComment;
import org.apache.metron.indexing.dao.search.GetRequest;
import org.apache.metron.indexing.dao.update.Document;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.metron.indexing.dao.HBaseDao.HBASE_CF;
+import static org.apache.metron.indexing.dao.HBaseDao.HBASE_TABLE;
+import static org.apache.metron.indexing.dao.IndexDao.COMMENTS_FIELD;
+
+/**
+ * An integration test for the {@link HBaseDao}.
+ */
public class HBaseDaoIntegrationTest extends UpdateIntegrationTest {
private static final String TABLE_NAME = "metron_update";
private static final String COLUMN_FAMILY = "cf";
private static final String SENSOR_TYPE = "test";
-
- private static IndexDao hbaseDao;
- private static byte[] expectedKeySerialization = new byte[] {
+ private static final byte[] expectedKeySerialization = new byte[] {
(byte)0xf5,0x53,0x76,(byte)0x96,0x67,0x3a,
(byte)0xc1,(byte)0xaf,(byte)0xff,0x41,0x33,(byte)0x9d,
(byte)0xac,(byte)0xb9,0x1a,(byte)0xb0,0x00,0x04,
@@ -60,17 +71,43 @@ public class HBaseDaoIntegrationTest extends UpdateIntegrationTest {
0x54,0x79,0x70,0x65
};
+ private static HBaseTestingUtility testUtil;
+ private static Configuration conf;
+ private static Table table;
+ private IndexDao hbaseDao;
+
+ @BeforeClass
+ public static void startHBase() throws Exception {
+ Configuration config = HBaseConfiguration.create();
+ config.set("hbase.master.hostname", "localhost");
+ config.set("hbase.regionserver.hostname", "localhost");
+
+ testUtil = new HBaseTestingUtility(config);
+ testUtil.startMiniCluster();
+
+ // create the table
+ table = testUtil.createTable(TableName.valueOf(TABLE_NAME), COLUMN_FAMILY);
+ testUtil.waitTableEnabled(table.getName());
+ }
+
+ @AfterClass
+ public static void stopHBase() throws Exception {
+ testUtil.deleteTable(table.getName());
+ testUtil.shutdownMiniCluster();
+ testUtil.cleanupTestDir();
+ }
+
@Before
- public void startHBase() throws Exception {
+ public void setup() throws Exception {
AccessConfig accessConfig = new AccessConfig();
+ accessConfig.setHbaseConnectionFactory(new HBaseConnectionFactory());
+ accessConfig.setHbaseConfiguration(testUtil.getConfiguration());
accessConfig.setMaxSearchResults(1000);
accessConfig.setMaxSearchGroups(1000);
accessConfig.setGlobalConfigSupplier(() -> new HashMap<String, Object>() {{
put(HBASE_TABLE, TABLE_NAME);
put(HBASE_CF, COLUMN_FAMILY);
}});
- MockHBaseTableProvider.addToCache(TABLE_NAME, COLUMN_FAMILY);
- accessConfig.setTableProvider(new MockHBaseTableProvider());
hbaseDao = new HBaseDao();
hbaseDao.init(accessConfig);
@@ -78,7 +115,11 @@ public class HBaseDaoIntegrationTest extends UpdateIntegrationTest {
@After
public void clearTable() throws Exception {
- MockHBaseTableProvider.clear();
+ // clear all records from the the table
+ List<Delete> deletions = new ArrayList<>();
+ for(Result r : table.getScanner(new Scan())) {
+ deletions.add(new Delete(r.getRow()));
+ }
}
/**
@@ -95,7 +136,6 @@ public class HBaseDaoIntegrationTest extends UpdateIntegrationTest {
Assert.assertArrayEquals(raw, expectedKeySerialization);
}
-
@Test
public void testKeySerialization() throws Exception {
HBaseDao.Key k = new HBaseDao.Key("guid", "sensorType");
@@ -231,11 +271,6 @@ public class HBaseDaoIntegrationTest extends UpdateIntegrationTest {
}
@Override
- protected MockHTable getMockHTable() {
- return null;
- }
-
- @Override
protected void addTestData(String indexName, String sensorType, List<Map<String, Object>> docs) {
}
diff --git a/metron-platform/metron-solr/metron-solr-common/src/main/java/org/apache/metron/solr/dao/SolrDao.java b/metron-platform/metron-solr/metron-solr-common/src/main/java/org/apache/metron/solr/dao/SolrDao.java
index 4a58808..ef68e86 100644
--- a/metron-platform/metron-solr/metron-solr-common/src/main/java/org/apache/metron/solr/dao/SolrDao.java
+++ b/metron-platform/metron-solr/metron-solr-common/src/main/java/org/apache/metron/solr/dao/SolrDao.java
@@ -91,6 +91,11 @@ public class SolrDao implements IndexDao {
}
}
+ @Override
+ public void close() {
+ // nothing to do
+ }
+
public Optional<String> getIndex(String sensorName, Optional<String> index) {
if (index.isPresent()) {
return index;
diff --git a/metron-platform/metron-solr/metron-solr-common/src/main/java/org/apache/metron/solr/dao/SolrMetaAlertDao.java b/metron-platform/metron-solr/metron-solr-common/src/main/java/org/apache/metron/solr/dao/SolrMetaAlertDao.java
index 26dc25d..1e19648 100644
--- a/metron-platform/metron-solr/metron-solr-common/src/main/java/org/apache/metron/solr/dao/SolrMetaAlertDao.java
+++ b/metron-platform/metron-solr/metron-solr-common/src/main/java/org/apache/metron/solr/dao/SolrMetaAlertDao.java
@@ -161,6 +161,16 @@ public class SolrMetaAlertDao implements MetaAlertDao {
}
@Override
+ public void close() throws IOException {
+ if(indexDao != null) {
+ indexDao.close();
+ }
+ if(solrDao != null) {
+ solrDao.close();
+ }
+ }
+
+ @Override
public void init(AccessConfig config) {
// Do nothing. We're just wrapping a child dao
}
diff --git a/metron-platform/metron-solr/metron-solr-common/src/main/java/org/apache/metron/solr/dao/SolrUtilities.java b/metron-platform/metron-solr/metron-solr-common/src/main/java/org/apache/metron/solr/dao/SolrUtilities.java
index d41b7e4..eb2ff06 100644
--- a/metron-platform/metron-solr/metron-solr-common/src/main/java/org/apache/metron/solr/dao/SolrUtilities.java
+++ b/metron-platform/metron-solr/metron-solr-common/src/main/java/org/apache/metron/solr/dao/SolrUtilities.java
@@ -64,8 +64,9 @@ public class SolrUtilities {
insertChildAlerts(solrDocument, document);
return new Document(document,
- (String) solrDocument.getFieldValue(Constants.GUID),
- (String) solrDocument.getFieldValue(Constants.SENSOR_TYPE), 0L);
+ (String) solrDocument.getFieldValue(Constants.GUID),
+ (String) solrDocument.getFieldValue(Constants.SENSOR_TYPE),
+ (Long) solrDocument.getFieldValue(Constants.Fields.TIMESTAMP.getName()));
}
protected static void reformatComments(SolrDocument solrDocument, Map<String, Object> document) {
diff --git a/metron-platform/metron-solr/metron-solr-common/src/test/java/org/apache/metron/solr/dao/SolrMetaAlertDaoTest.java b/metron-platform/metron-solr/metron-solr-common/src/test/java/org/apache/metron/solr/dao/SolrMetaAlertDaoTest.java
index 6817299..0adcf49 100644
--- a/metron-platform/metron-solr/metron-solr-common/src/test/java/org/apache/metron/solr/dao/SolrMetaAlertDaoTest.java
+++ b/metron-platform/metron-solr/metron-solr-common/src/test/java/org/apache/metron/solr/dao/SolrMetaAlertDaoTest.java
@@ -95,6 +95,10 @@ public class SolrMetaAlertDaoTest {
}
@Override
+ public void close() throws IOException {
+ }
+
+ @Override
public Document getLatest(String guid, String sensorType) {
return null;
}
diff --git a/metron-platform/metron-solr/metron-solr-common/src/test/java/org/apache/metron/solr/dao/SolrSearchDaoTest.java b/metron-platform/metron-solr/metron-solr-common/src/test/java/org/apache/metron/solr/dao/SolrSearchDaoTest.java
index bf9458f..f4ef16b 100644
--- a/metron-platform/metron-solr/metron-solr-common/src/test/java/org/apache/metron/solr/dao/SolrSearchDaoTest.java
+++ b/metron-platform/metron-solr/metron-solr-common/src/test/java/org/apache/metron/solr/dao/SolrSearchDaoTest.java
@@ -38,6 +38,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.UUID;
+
import org.apache.metron.common.Constants;
import org.apache.metron.indexing.dao.AccessConfig;
import org.apache.metron.indexing.dao.search.GetRequest;
@@ -219,7 +221,7 @@ public class SolrSearchDaoTest {
@Test
public void getLatestShouldProperlyReturnDocument() throws Exception {
- SolrDocument solrDocument = mock(SolrDocument.class);
+ SolrDocument solrDocument = createSolrDocument("bro", 123456789L);
solrSearchDao = spy(new SolrSearchDao(client, accessConfig));
when(client.getById("collection", "guid")).thenReturn(solrDocument);
@@ -237,10 +239,10 @@ public class SolrSearchDaoTest {
GetRequest broRequest2 = new GetRequest("bro-2", "bro");
GetRequest snortRequest1 = new GetRequest("snort-1", "snort");
GetRequest snortRequest2 = new GetRequest("snort-2", "snort");
- SolrDocument broSolrDoc1 = mock(SolrDocument.class);
- SolrDocument broSolrDoc2 = mock(SolrDocument.class);
- SolrDocument snortSolrDoc1 = mock(SolrDocument.class);
- SolrDocument snortSolrDoc2 = mock(SolrDocument.class);
+ SolrDocument broSolrDoc1 = createSolrDocument("bro", 12345L);
+ SolrDocument broSolrDoc2 = createSolrDocument("bro", 34567L);
+ SolrDocument snortSolrDoc1 = createSolrDocument("snort", 12345L);
+ SolrDocument snortSolrDoc2 = createSolrDocument("snort", 67890L);
Document broDoc1 = SolrUtilities.toDocument(broSolrDoc1);
Document broDoc2 = SolrUtilities.toDocument(broSolrDoc2);
Document snortDoc1 = SolrUtilities.toDocument(snortSolrDoc1);
@@ -510,5 +512,13 @@ public class SolrSearchDaoTest {
assertNull(level2GroupResults.get(1).getGroupResults());
}
-
+ private SolrDocument createSolrDocument(String sensorType, Long timestamp) {
+ SolrDocument solrDocument = new SolrDocument();
+ solrDocument.addField(SolrDao.VERSION_FIELD, 1.0);
+ solrDocument.addField(Constants.GUID, UUID.randomUUID().toString());
+ solrDocument.addField(Constants.SENSOR_TYPE, sensorType);
+ solrDocument.addField(Constants.Fields.TIMESTAMP.getName(), timestamp);
+ solrDocument.addField("ip_src_addr", "192.168.1.1");
+ return solrDocument;
+ }
}
diff --git a/metron-platform/metron-solr/metron-solr-common/src/test/java/org/apache/metron/solr/dao/SolrUtilitiesTest.java b/metron-platform/metron-solr/metron-solr-common/src/test/java/org/apache/metron/solr/dao/SolrUtilitiesTest.java
index f284f25..89441c0 100644
--- a/metron-platform/metron-solr/metron-solr-common/src/test/java/org/apache/metron/solr/dao/SolrUtilitiesTest.java
+++ b/metron-platform/metron-solr/metron-solr-common/src/test/java/org/apache/metron/solr/dao/SolrUtilitiesTest.java
@@ -30,17 +30,20 @@ public class SolrUtilitiesTest {
@Test
public void toDocumentShouldProperlyReturnDocument() throws Exception {
+ long expectedTimestamp = System.currentTimeMillis();
SolrDocument solrDocument = new SolrDocument();
solrDocument.addField(SolrDao.VERSION_FIELD, 1.0);
solrDocument.addField(Constants.GUID, "guid");
solrDocument.addField(Constants.SENSOR_TYPE, "bro");
+ solrDocument.addField(Constants.Fields.TIMESTAMP.getName(), expectedTimestamp);
solrDocument.addField("field", "value");
Document expectedDocument = new Document(new HashMap<String, Object>() {{
put("field", "value");
put(Constants.GUID, "guid");
put(Constants.SENSOR_TYPE, "bro");
- }}, "guid", "bro", 0L);
+ put(Constants.Fields.TIMESTAMP.getName(), expectedTimestamp);
+ }}, "guid", "bro", expectedTimestamp);
Document actualDocument = SolrUtilities.toDocument(solrDocument);
assertEquals(expectedDocument, actualDocument);
diff --git a/metron-platform/metron-solr/metron-solr-common/src/test/java/org/apache/metron/solr/integration/SolrRetrieveLatestIntegrationTest.java b/metron-platform/metron-solr/metron-solr-common/src/test/java/org/apache/metron/solr/integration/SolrRetrieveLatestIntegrationTest.java
index 8c18981..d147202 100644
--- a/metron-platform/metron-solr/metron-solr-common/src/test/java/org/apache/metron/solr/integration/SolrRetrieveLatestIntegrationTest.java
+++ b/metron-platform/metron-solr/metron-solr-common/src/test/java/org/apache/metron/solr/integration/SolrRetrieveLatestIntegrationTest.java
@@ -51,7 +51,7 @@ public class SolrRetrieveLatestIntegrationTest {
protected static final String TEST_COLLECTION = "test";
protected static final String TEST_SENSOR = "test_sensor";
protected static final String BRO_SENSOR = "bro";
-
+ protected final long expectedTimestamp = 123456789L;
private static IndexDao dao;
@BeforeClass
@@ -62,8 +62,7 @@ public class SolrRetrieveLatestIntegrationTest {
@Before
public void setup() throws Exception {
- solrComponent
- .addCollection(TEST_COLLECTION, "./src/test/resources/config/test/conf");
+ solrComponent.addCollection(TEST_COLLECTION, "./src/test/resources/config/test/conf");
solrComponent.addCollection(BRO_SENSOR, "./src/main/config/schema/bro");
AccessConfig accessConfig = new AccessConfig();
@@ -75,8 +74,8 @@ public class SolrRetrieveLatestIntegrationTest {
dao = new SolrDao();
dao.init(accessConfig);
- addData(BRO_SENSOR, BRO_SENSOR);
- addData(TEST_COLLECTION, TEST_SENSOR);
+ addData(BRO_SENSOR, BRO_SENSOR, expectedTimestamp);
+ addData(TEST_COLLECTION, TEST_SENSOR, expectedTimestamp);
}
@After
@@ -131,8 +130,11 @@ public class SolrRetrieveLatestIntegrationTest {
requests.add(buildGetRequest(BRO_SENSOR, 2));
Iterable<Document> actual = dao.getAllLatest(requests);
- assertTrue(Iterables.contains(actual, buildExpectedDocument(BRO_SENSOR, 1)));
- assertTrue(Iterables.contains(actual, buildExpectedDocument(BRO_SENSOR, 2)));
+ Document expected1 = buildExpectedDocument(BRO_SENSOR, 1);
+ assertTrue(Iterables.contains(actual, expected1));
+
+ Document expected2 = buildExpectedDocument(BRO_SENSOR, 2);
+ assertTrue(Iterables.contains(actual, expected2));
assertEquals(2, Iterables.size(actual));
}
@@ -179,8 +181,9 @@ public class SolrRetrieveLatestIntegrationTest {
protected Document buildExpectedDocument(String sensor, int i) {
Map<String, Object> expectedMapOne = new HashMap<>();
expectedMapOne.put("source.type", sensor);
+ expectedMapOne.put(Constants.Fields.TIMESTAMP.getName(), expectedTimestamp);
expectedMapOne.put(Constants.GUID, buildGuid(sensor, i));
- return new Document(expectedMapOne, buildGuid(sensor, i), sensor, 0L);
+ return new Document(expectedMapOne, buildGuid(sensor, i), sensor, expectedTimestamp);
}
protected GetRequest buildGetRequest(String sensor, int i) {
@@ -190,7 +193,7 @@ public class SolrRetrieveLatestIntegrationTest {
return requestOne;
}
- protected static void addData(String collection, String sensorName)
+ protected static void addData(String collection, String sensorName, Long timestamp)
throws IOException, SolrServerException {
List<Map<String, Object>> inputData = new ArrayList<>();
for (int i = 0; i < 3; ++i) {
@@ -198,6 +201,7 @@ public class SolrRetrieveLatestIntegrationTest {
HashMap<String, Object> inputMap = new HashMap<>();
inputMap.put("source.type", sensorName);
inputMap.put(Constants.GUID, name);
+ inputMap.put(Constants.Fields.TIMESTAMP.getName(), timestamp);
inputData.add(inputMap);
}
solrComponent.addDocs(collection, inputData);
diff --git a/metron-platform/metron-solr/metron-solr-common/src/test/java/org/apache/metron/solr/integration/SolrUpdateIntegrationTest.java b/metron-platform/metron-solr/metron-solr-common/src/test/java/org/apache/metron/solr/integration/SolrUpdateIntegrationTest.java
index d49f4b4..852ca10 100644
--- a/metron-platform/metron-solr/metron-solr-common/src/test/java/org/apache/metron/solr/integration/SolrUpdateIntegrationTest.java
+++ b/metron-platform/metron-solr/metron-solr-common/src/test/java/org/apache/metron/solr/integration/SolrUpdateIntegrationTest.java
@@ -17,32 +17,19 @@
*/
package org.apache.metron.solr.integration;
-import static org.apache.metron.solr.SolrConstants.SOLR_ZOOKEEPER;
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.metron.common.configuration.ConfigurationsUtils;
import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
-import org.apache.metron.hbase.mock.MockHBaseTableProvider;
-import org.apache.metron.hbase.mock.MockHTable;
import org.apache.metron.indexing.dao.AccessConfig;
import org.apache.metron.indexing.dao.HBaseDao;
-import org.apache.metron.indexing.dao.IndexDao;
-import org.apache.metron.indexing.dao.MultiIndexDao;
import org.apache.metron.indexing.dao.UpdateIntegrationTest;
import org.apache.metron.indexing.dao.update.Document;
import org.apache.metron.indexing.util.IndexingCacheUtil;
import org.apache.metron.solr.client.SolrClientFactory;
import org.apache.metron.solr.dao.SolrDao;
import org.apache.metron.solr.integration.components.SolrComponent;
+import org.apache.solr.common.SolrException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -51,6 +38,15 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.metron.solr.SolrConstants.SOLR_ZOOKEEPER;
+import static org.junit.Assert.assertEquals;
+
public class SolrUpdateIntegrationTest extends UpdateIntegrationTest {
@Rule
public final ExpectedException exception = ExpectedException.none();
@@ -59,8 +55,6 @@ public class SolrUpdateIntegrationTest extends UpdateIntegrationTest {
private static final String TABLE_NAME = "modifications";
private static final String CF = "p";
- private static MockHTable table;
- private static IndexDao hbaseDao;
@BeforeClass
public static void setupBeforeClass() throws Exception {
@@ -73,28 +67,21 @@ public class SolrUpdateIntegrationTest extends UpdateIntegrationTest {
solrComponent.addCollection(SENSOR_NAME, "./src/test/resources/config/test/conf");
solrComponent.addCollection("error", "./src/main/config/schema/error");
- Configuration config = HBaseConfiguration.create();
- MockHBaseTableProvider tableProvider = new MockHBaseTableProvider();
- MockHBaseTableProvider.addToCache(TABLE_NAME, CF);
- table = (MockHTable) tableProvider.getTable(config, TABLE_NAME);
-
- hbaseDao = new HBaseDao();
- AccessConfig accessConfig = new AccessConfig();
- accessConfig.setTableProvider(tableProvider);
Map<String, Object> globalConfig = createGlobalConfig();
globalConfig.put(HBaseDao.HBASE_TABLE, TABLE_NAME);
globalConfig.put(HBaseDao.HBASE_CF, CF);
- accessConfig.setGlobalConfigSupplier(() -> globalConfig);
- accessConfig.setIndexSupplier(s -> s);
- CuratorFramework client = ConfigurationsUtils
- .getClient(solrComponent.getZookeeperUrl());
+ CuratorFramework client = ConfigurationsUtils.getClient(solrComponent.getZookeeperUrl());
client.start();
ZKConfigurationsCache cache = new ZKConfigurationsCache(client);
cache.start();
+
+ AccessConfig accessConfig = new AccessConfig();
+ accessConfig.setGlobalConfigSupplier(() -> globalConfig);
+ accessConfig.setIndexSupplier(s -> s);
accessConfig.setIndexSupplier(IndexingCacheUtil.getIndexLookupFunction(cache, "solr"));
- MultiIndexDao dao = new MultiIndexDao(hbaseDao, new SolrDao());
+ SolrDao dao = new SolrDao();
dao.init(accessConfig);
setDao(dao);
}
@@ -102,7 +89,6 @@ public class SolrUpdateIntegrationTest extends UpdateIntegrationTest {
@After
public void reset() {
solrComponent.reset();
- table.clear();
}
@AfterClass
@@ -116,11 +102,6 @@ public class SolrUpdateIntegrationTest extends UpdateIntegrationTest {
return SENSOR_NAME;
}
- @Override
- protected MockHTable getMockHTable() {
- return table;
- }
-
private static Map<String, Object> createGlobalConfig() {
return new HashMap<String, Object>() {{
put(SOLR_ZOOKEEPER, solrComponent.getZookeeperUrl());
@@ -184,8 +165,9 @@ public class SolrUpdateIntegrationTest extends UpdateIntegrationTest {
documentMap.put("error_hash", hugeString);
errorDoc = new Document(documentMap, "error", "error", 0L);
- exception.expect(IOException.class);
+ exception.expect(SolrException.class);
exception.expectMessage("Document contains at least one immense term in field=\"error_hash\"");
+
getDao().update(errorDoc, Optional.of("error"));
}
}