You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2016/04/26 16:45:54 UTC
[06/51] [partial] incubator-metron git commit: METRON-113 Project
Reorganization (merrimanr) closes apache/incubator-metron#88
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/SplitBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/SplitBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/SplitBolt.java
deleted file mode 100644
index 89e13a4..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/SplitBolt.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.bolt;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import java.util.Map;
-import java.util.Set;
-
-public abstract class SplitBolt<T> extends
- ConfiguredBolt {
-
- protected OutputCollector collector;
-
- public SplitBolt(String zookeeperUrl) {
- super(zookeeperUrl);
- }
-
- @Override
- public final void prepare(Map map, TopologyContext topologyContext,
- OutputCollector outputCollector) {
- super.prepare(map, topologyContext, outputCollector);
- collector = outputCollector;
- prepare(map, topologyContext);
- }
-
- @Override
- public final void execute(Tuple tuple) {
- emit(tuple, generateMessage(tuple));
- }
-
- @Override
- public final void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declareStream("message", new Fields("key", "message"));
- for (String streamId : getStreamIds()) {
- declarer.declareStream(streamId, new Fields("key", "message"));
- }
- declarer.declareStream("error", new Fields("message"));
- declareOther(declarer);
- }
-
- public void emit(Tuple tuple, T message) {
- if (message == null) return;
- String key = getKey(tuple, message);
- collector.emit("message", tuple, new Values(key, message));
- Map<String, T> streamMessageMap = splitMessage(message);
- for (String streamId : streamMessageMap.keySet()) {
- T streamMessage = streamMessageMap.get(streamId);
- if (streamMessage == null) {
- streamMessage = getDefaultMessage(streamId);
- }
- collector.emit(streamId, new Values(key, streamMessage));
- }
- collector.ack(tuple);
- emitOther(tuple, message);
- }
-
- protected T getDefaultMessage(String streamId) {
- throw new IllegalArgumentException("Could not find a message for" +
- " stream: " + streamId);
- }
-
- public abstract void prepare(Map map, TopologyContext topologyContext);
-
- public abstract Set<String> getStreamIds();
-
- public abstract String getKey(Tuple tuple, T message);
-
- public abstract T generateMessage(Tuple tuple);
-
- public abstract Map<String, T> splitMessage(T message);
-
- public abstract void declareOther(OutputFieldsDeclarer declarer);
-
- public abstract void emitOther(Tuple tuple, T message);
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/configuration/ConfigurationManager.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/configuration/ConfigurationManager.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/configuration/ConfigurationManager.java
deleted file mode 100644
index 0989150..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/configuration/ConfigurationManager.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.configuration;
-
-
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.configuration.CombinedConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.DefaultConfigurationBuilder;
-import org.apache.commons.lang.StringUtils;
-import org.apache.log4j.Logger;
-
-/**
- * Configuration manager class which loads all 'config-definition.xml' files and
- * creates a Configuration object which holds all properties from the underlying
- * configuration resource
- */
-public class ConfigurationManager {
-
- /** configuration definition file name. */
- private static String DEFAULT_CONFIG_DEFINITION_FILE_NAME = "config-definition.xml";
-
- /** Stores a map with the configuration for each path specified. */
- private static Map<String, Configuration> configurationsCache = new HashMap<String, Configuration>();
-
- /** The Constant LOGGER. */
- private static final Logger LOGGER = Logger
- .getLogger(ConfigurationManager.class);
-
- /**
- * Common method to load content of all configuration resources defined in
- * 'config-definition.xml'.
- *
- * @param configDefFilePath
- * the config def file path
- * @return Configuration
- */
- public static Configuration getConfiguration(String configDefFilePath) {
- if (configurationsCache.containsKey(configDefFilePath)) {
- return configurationsCache.get(configDefFilePath);
- }
- CombinedConfiguration configuration = null;
- synchronized (configurationsCache) {
- if (configurationsCache.containsKey(configDefFilePath)) {
- return configurationsCache.get(configDefFilePath);
- }
- DefaultConfigurationBuilder builder = new DefaultConfigurationBuilder();
- String fielPath = getConfigDefFilePath(configDefFilePath);
- LOGGER.info("loading from 'configDefFilePath' :" + fielPath);
- builder.setFile(new File(fielPath));
- try {
- configuration = builder.getConfiguration(true);
- configurationsCache.put(fielPath, configuration);
- } catch (ConfigurationException e) {
- LOGGER.info("Exception in loading property files.", e);
- }
- }
- return configuration;
- }
-
- /**
- * Removes the configuration created from a config definition file located at
- * 'configDefFilePath'.
- *
- * @param configDefFilePath
- * path to the config definition file
- */
- public static void clearConfiguration(String configDefFilePath) {
- configurationsCache.remove(configDefFilePath);
- }
-
- /**
- * Gets the configuration.
- *
- * @return the configuration
- */
- public static Configuration getConfiguration() {
- return getConfiguration(null);
- }
-
- /**
- * Returns the 'config-definition.xml' file path. 1. If the param
- * 'configDefFilePath' has a valid value, returns configDefFilePath 2. If the
- * system property key 'configDefFilePath' has a valid value, returns the
- * value 3. By default, it returns the file name 'config-definition.xml'
- *
- * @param configDefFilePath
- * given input path to the config definition file
- * @return the config def file path
- */
- private static String getConfigDefFilePath(String configDefFilePath) {
- if (StringUtils.isNotEmpty(configDefFilePath)) {
- return configDefFilePath;
- }
- return DEFAULT_CONFIG_DEFINITION_FILE_NAME;
- }
-
- /**
- * The main method.
- *
- * @param args
- * the args
- * @throws InterruptedException
- * the interrupted exception
- */
- public static void main(String[] args) throws InterruptedException {
- Configuration config = ConfigurationManager
- .getConfiguration("/Users/Sayi/Documents/config/config-definition-dpi.xml");
- System.out.println("elastic.search.cluster ="
- + config.getString("elastic.search.cluster"));
- Thread.sleep(10000);
- System.out.println("storm.topology.dpi.bolt.es-index.index.name ="
- + config.getString("storm.topology.dpi.bolt.es-index.index.name"));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/dataloads/interfaces/ThreatIntelSource.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/dataloads/interfaces/ThreatIntelSource.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/dataloads/interfaces/ThreatIntelSource.java
deleted file mode 100644
index 4e87a1c..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/dataloads/interfaces/ThreatIntelSource.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.dataloads.interfaces;
-
-import java.util.Iterator;
-import org.apache.commons.configuration.Configuration;
-import org.json.simple.JSONObject;
-
-public interface ThreatIntelSource extends Iterator<JSONObject> {
-
- void initializeSource(Configuration config);
- void cleanupSource();
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configuration.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configuration.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configuration.java
deleted file mode 100644
index d21c686..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configuration.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.domain;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.metron.utils.ConfigurationsUtils;
-
-import java.nio.file.Path;
-import java.util.Map;
-
-public class Configuration extends Configurations {
-
- protected CuratorFramework curatorFramework = null;
- private Path configFileRoot;
-
- public Configuration(CuratorFramework curatorFramework){
-
- this.curatorFramework = curatorFramework;
-
- }
-
-
- public Configuration(Path configFileRoot){
-
- this.configFileRoot = configFileRoot;
- }
-
- public void update() throws Exception {
-
- if( null != curatorFramework ) {
-
- ConfigurationsUtils.updateConfigsFromZookeeper(this, this.curatorFramework);
-
- } else {
-
- updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile(configFileRoot.toAbsolutePath().toString()));
- Map<String, byte[]> sensorEnrichmentConfigs = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(configFileRoot.toAbsolutePath().toString());
- for(String sensorType: sensorEnrichmentConfigs.keySet()) {
- updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfigs.get(sensorType));
- }
-
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configurations.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configurations.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configurations.java
deleted file mode 100644
index 63e0f95..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configurations.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.domain;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import org.apache.log4j.Logger;
-import org.apache.metron.utils.JSONUtils;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-public class Configurations implements Serializable {
-
- private static final Logger LOG = Logger.getLogger(Configurations.class);
-
- public enum Type {
- GLOBAL, SENSOR, OTHER
- }
-
- public static final String GLOBAL_CONFIG_NAME = "global";
-
- private ConcurrentMap<String, Object> configurations = new ConcurrentHashMap<>();
-
- @SuppressWarnings("unchecked")
- public Map<String, Object> getGlobalConfig() {
- return (Map<String, Object>) configurations.get(GLOBAL_CONFIG_NAME);
- }
-
- public void updateGlobalConfig(byte[] data) throws IOException {
- updateGlobalConfig(new ByteArrayInputStream(data));
- }
-
- public void updateGlobalConfig(InputStream io) throws IOException {
- Map<String, Object> globalConfig = JSONUtils.INSTANCE.load(io, new TypeReference<Map<String, Object>>() {
- });
- updateGlobalConfig(globalConfig);
- }
-
- public void updateGlobalConfig(Map<String, Object> globalConfig) {
- configurations.put(GLOBAL_CONFIG_NAME, globalConfig);
- }
-
- public SensorEnrichmentConfig getSensorEnrichmentConfig(String sensorType) {
- return (SensorEnrichmentConfig) configurations.get(sensorType);
- }
-
- public void updateSensorEnrichmentConfig(String sensorType, byte[] data) throws IOException {
- updateSensorEnrichmentConfig(sensorType, new ByteArrayInputStream(data));
- }
-
- public void updateSensorEnrichmentConfig(String sensorType, InputStream io) throws IOException {
- SensorEnrichmentConfig sensorEnrichmentConfig = JSONUtils.INSTANCE.load(io, SensorEnrichmentConfig.class);
- updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfig);
- }
-
- public void updateSensorEnrichmentConfig(String sensorType, SensorEnrichmentConfig sensorEnrichmentConfig) {
- configurations.put(sensorType, sensorEnrichmentConfig);
- }
-
- @SuppressWarnings("unchecked")
- public Map<String, Object> getConfig(String name) {
- return (Map<String, Object>) configurations.get(name);
- }
-
- public void updateConfig(String name, byte[] data) throws IOException {
- if (data == null) throw new IllegalStateException("config data cannot be null");
- Map<String, Object> config = JSONUtils.INSTANCE.load(new ByteArrayInputStream(data), new TypeReference<Map<String, Object>>() {});
- updateConfig(name, config);
- }
-
- public void updateConfig(String name, Map<String, Object> config) {
- configurations.put(name, config);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- Configurations that = (Configurations) o;
- return configurations.equals(that.configurations);
- }
-
- @Override
- public int hashCode() {
- return configurations.hashCode();
- }
-
- @Override
- public String toString() {
- return configurations.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java
deleted file mode 100644
index 6f43739..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.domain;
-
-import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
-
-import java.io.Serializable;
-import java.util.List;
-
-public class Enrichment<T extends EnrichmentAdapter> implements Serializable {
-
- private String type;
- private List<String> fields;
- private T adapter;
-
- public Enrichment() {}
-
- public Enrichment(String type, T adapter) {
- this.type = type;
- this.adapter = adapter;
- }
-
-
- public List<String> getFields() {
- return fields;
- }
-
- public void setFields(List<String> fields) {
- this.fields = fields;
- }
-
- public String getType() {
- return type;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- public T getAdapter() {
- return adapter;
- }
-
- public void setAdapter(T adapter) {
- this.adapter = adapter;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SensorEnrichmentConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SensorEnrichmentConfig.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SensorEnrichmentConfig.java
deleted file mode 100644
index ea345ca..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SensorEnrichmentConfig.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.domain;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.metron.utils.JSONUtils;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class SensorEnrichmentConfig {
-
- private String index;
- private Map<String, List<String>> enrichmentFieldMap;
- private Map<String, List<String>> threatIntelFieldMap;
- private Map<String, List<String>> fieldToEnrichmentTypeMap = new HashMap<>();
- private Map<String, List<String>> fieldToThreatIntelTypeMap = new HashMap<>();
- private int batchSize;
-
- public String getIndex() {
- return index;
- }
-
- public void setIndex(String index) {
- this.index = index;
- }
-
- public Map<String, List<String>> getEnrichmentFieldMap() {
- return enrichmentFieldMap;
- }
-
- public void setEnrichmentFieldMap(Map<String, List<String>> enrichmentFieldMap) {
- this.enrichmentFieldMap = enrichmentFieldMap;
- }
-
- public Map<String, List<String>> getThreatIntelFieldMap() {
- return threatIntelFieldMap;
- }
-
- public void setThreatIntelFieldMap(Map<String, List<String>> threatIntelFieldMap) {
- this.threatIntelFieldMap = threatIntelFieldMap;
- }
-
- public Map<String, List<String>> getFieldToEnrichmentTypeMap() {
- return fieldToEnrichmentTypeMap;
- }
-
- public Map<String, List<String>> getFieldToThreatIntelTypeMap() {
- return fieldToThreatIntelTypeMap;
- }
- public void setFieldToEnrichmentTypeMap(Map<String, List<String>> fieldToEnrichmentTypeMap) {
- this.fieldToEnrichmentTypeMap = fieldToEnrichmentTypeMap;
- }
-
- public void setFieldToThreatIntelTypeMap(Map<String, List<String>> fieldToThreatIntelTypeMap) {
- this.fieldToThreatIntelTypeMap= fieldToThreatIntelTypeMap;
- }
- public int getBatchSize() {
- return batchSize;
- }
-
- public void setBatchSize(int batchSize) {
- this.batchSize = batchSize;
- }
-
- public static SensorEnrichmentConfig fromBytes(byte[] config) throws IOException {
- return JSONUtils.INSTANCE.load(new String(config), SensorEnrichmentConfig.class);
- }
-
- public String toJSON() throws JsonProcessingException {
- return JSONUtils.INSTANCE.toJSON(this, true);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- SensorEnrichmentConfig that = (SensorEnrichmentConfig) o;
-
- if (getBatchSize() != that.getBatchSize()) return false;
- if (getIndex() != null ? !getIndex().equals(that.getIndex()) : that.getIndex() != null) return false;
- if (getEnrichmentFieldMap() != null ? !getEnrichmentFieldMap().equals(that.getEnrichmentFieldMap()) : that.getEnrichmentFieldMap() != null)
- return false;
- if (getThreatIntelFieldMap() != null ? !getThreatIntelFieldMap().equals(that.getThreatIntelFieldMap()) : that.getThreatIntelFieldMap() != null)
- return false;
- if (getFieldToEnrichmentTypeMap() != null ? !getFieldToEnrichmentTypeMap().equals(that.getFieldToEnrichmentTypeMap()) : that.getFieldToEnrichmentTypeMap() != null)
- return false;
- return getFieldToThreatIntelTypeMap() != null ? getFieldToThreatIntelTypeMap().equals(that.getFieldToThreatIntelTypeMap()) : that.getFieldToThreatIntelTypeMap() == null;
-
- }
-
- @Override
- public String toString() {
- return "{index=" + index + ", batchSize=" + batchSize +
- ", enrichmentFieldMap=" + enrichmentFieldMap +
- ", threatIntelFieldMap" + threatIntelFieldMap +
- ", fieldToEnrichmentTypeMap=" + fieldToEnrichmentTypeMap +
- ", fieldToThreatIntelTypeMap=" + fieldToThreatIntelTypeMap + "}";
- }
-
- @Override
- public int hashCode() {
- int result = getIndex() != null ? getIndex().hashCode() : 0;
- result = 31 * result + (getEnrichmentFieldMap() != null ? getEnrichmentFieldMap().hashCode() : 0);
- result = 31 * result + (getThreatIntelFieldMap() != null ? getThreatIntelFieldMap().hashCode() : 0);
- result = 31 * result + (getFieldToEnrichmentTypeMap() != null ? getFieldToEnrichmentTypeMap().hashCode() : 0);
- result = 31 * result + (getFieldToThreatIntelTypeMap() != null ? getFieldToThreatIntelTypeMap().hashCode() : 0);
- result = 31 * result + getBatchSize();
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentConfig.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentConfig.java
deleted file mode 100644
index 92913d9..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentConfig.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.enrichment;
-
-import com.google.common.base.Joiner;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.metron.Constants;
-import org.apache.metron.domain.SensorEnrichmentConfig;
-import org.apache.metron.utils.ConfigurationsUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-
-public class EnrichmentConfig {
- public static enum Type {
- THREAT_INTEL
- ,ENRICHMENT
- }
-
- protected static final Logger _LOG = LoggerFactory.getLogger(EnrichmentConfig.class);
- public static class FieldList {
- Type type;
- Map<String, List<String>> fieldToEnrichmentTypes;
-
- public Type getType() {
- return type;
- }
-
- public void setType(Type type) {
- this.type = type;
- }
-
- public Map<String, List<String>> getFieldToEnrichmentTypes() {
- return fieldToEnrichmentTypes;
- }
-
- public void setFieldToEnrichmentTypes(Map<String, List<String>> fieldToEnrichmentTypes) {
- this.fieldToEnrichmentTypes = fieldToEnrichmentTypes;
- }
- }
- public String zkQuorum;
- public Map<String, FieldList> sensorToFieldList;
-
- public String getZkQuorum() {
- return zkQuorum;
- }
-
- public void setZkQuorum(String zkQuorum) {
- this.zkQuorum = zkQuorum;
- }
-
- public Map<String, FieldList> getSensorToFieldList() {
- return sensorToFieldList;
- }
-
- public void setSensorToFieldList(Map<String, FieldList> sensorToFieldList) {
- this.sensorToFieldList = sensorToFieldList;
- }
-
- public void updateSensorConfigs( ) throws Exception {
- CuratorFramework client = ConfigurationsUtils.getClient(getZkQuorum());
- try {
- client.start();
- updateSensorConfigs(new ZKSourceConfigHandler(client), sensorToFieldList);
- }
- finally {
- client.close();
- }
- }
-
- public static interface SourceConfigHandler {
- SensorEnrichmentConfig readConfig(String sensor) throws Exception;
- void persistConfig(String sensor, SensorEnrichmentConfig config) throws Exception;
- }
-
- public static class ZKSourceConfigHandler implements SourceConfigHandler {
- CuratorFramework client;
- public ZKSourceConfigHandler(CuratorFramework client) {
- this.client = client;
- }
- @Override
- public SensorEnrichmentConfig readConfig(String sensor) throws Exception {
- return SensorEnrichmentConfig.fromBytes(ConfigurationsUtils.readSensorEnrichmentConfigBytesFromZookeeper(sensor, client));
- }
-
- @Override
- public void persistConfig(String sensor, SensorEnrichmentConfig config) throws Exception {
- ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensor, config.toJSON().getBytes(), client);
- }
- }
-
- public static void updateSensorConfigs( SourceConfigHandler scHandler
- , Map<String, FieldList> sensorToFieldList
- ) throws Exception
- {
- Map<String, SensorEnrichmentConfig> sourceConfigsChanged = new HashMap<>();
- for (Map.Entry<String, FieldList> kv : sensorToFieldList.entrySet()) {
- SensorEnrichmentConfig config = sourceConfigsChanged.get(kv.getKey());
- if(config == null) {
- config = scHandler.readConfig(kv.getKey());
- if(_LOG.isDebugEnabled()) {
- _LOG.debug(config.toJSON());
- }
- }
- Map<String, List<String> > fieldMap = null;
- Map<String, List<String> > fieldToTypeMap = null;
- List<String> fieldList = null;
- if(kv.getValue().type == Type.THREAT_INTEL) {
- fieldMap = config.getThreatIntelFieldMap();
- if(fieldMap!= null) {
- fieldList = fieldMap.get(Constants.SIMPLE_HBASE_THREAT_INTEL);
- }
- if(fieldList == null) {
- fieldList = new ArrayList<>();
- fieldMap.put(Constants.SIMPLE_HBASE_THREAT_INTEL, fieldList);
- }
- fieldToTypeMap = config.getFieldToThreatIntelTypeMap();
- if(fieldToTypeMap == null) {
- fieldToTypeMap = new HashMap<>();
- config.setFieldToThreatIntelTypeMap(fieldToTypeMap);
- }
- }
- else if(kv.getValue().type == Type.ENRICHMENT) {
- fieldMap = config.getEnrichmentFieldMap();
- if(fieldMap!= null) {
- fieldList = fieldMap.get(Constants.SIMPLE_HBASE_ENRICHMENT);
- }
- if(fieldList == null) {
- fieldList = new ArrayList<>();
- fieldMap.put(Constants.SIMPLE_HBASE_ENRICHMENT, fieldList);
- }
- fieldToTypeMap = config.getFieldToEnrichmentTypeMap();
- if(fieldToTypeMap == null) {
- fieldToTypeMap = new HashMap<>();
- config.setFieldToEnrichmentTypeMap(fieldToTypeMap);
- }
- }
- if(fieldToTypeMap == null || fieldMap == null) {
- _LOG.debug("fieldToTypeMap is null or fieldMap is null, so skipping");
- continue;
- }
- //Add the additional fields to the field list associated with the hbase adapter
- {
- HashSet<String> fieldSet = new HashSet<>(fieldList);
- List<String> additionalFields = new ArrayList<>();
- for (String field : kv.getValue().getFieldToEnrichmentTypes().keySet()) {
- if (!fieldSet.contains(field)) {
- additionalFields.add(field);
- }
- }
- //adding only the ones that we don't already have to the field list
- if (additionalFields.size() > 0) {
- _LOG.debug("Adding additional fields: " + Joiner.on(',').join(additionalFields));
- fieldList.addAll(additionalFields);
- sourceConfigsChanged.put(kv.getKey(), config);
- }
- }
- //Add the additional enrichment types to the mapping between the fields
- {
- for(Map.Entry<String, List<String>> fieldToType : kv.getValue().getFieldToEnrichmentTypes().entrySet()) {
- String field = fieldToType.getKey();
- final HashSet<String> types = new HashSet<>(fieldToType.getValue());
- int sizeBefore = 0;
- if(fieldToTypeMap.containsKey(field)) {
- List<String> typeList = fieldToTypeMap.get(field);
- sizeBefore = new HashSet<>(typeList).size();
- types.addAll(typeList);
- }
- int sizeAfter = types.size();
- boolean changed = sizeBefore != sizeAfter;
- if(changed) {
- fieldToTypeMap.put(field, new ArrayList<String>() {{
- addAll(types);
- }});
- sourceConfigsChanged.put(kv.getKey(), config);
- }
- }
- }
- }
- for(Map.Entry<String, SensorEnrichmentConfig> kv : sourceConfigsChanged.entrySet()) {
- scHandler.persistConfig(kv.getKey(), kv.getValue());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentConstants.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentConstants.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentConstants.java
deleted file mode 100644
index 4f7be3b..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentConstants.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment;
-
-public class EnrichmentConstants {
-
-
-
- public static final String INDEX_NAME = "index.name";
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
deleted file mode 100644
index 28f9956..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.enrichment.interfaces;
-
-import org.json.simple.JSONObject;
-
-public interface EnrichmentAdapter<T>
-{
- void logAccess(T value);
- JSONObject enrich(T value);
- boolean initializeAdapter();
- void cleanup();
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/Connector.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/Connector.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/Connector.java
deleted file mode 100644
index 35da040..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/Connector.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.hbase;
-
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.io.Serializable;
-
-/**
- * Created by cstella on 1/29/16.
- */
-public abstract class Connector {
- protected TableConfig tableConf;
- protected String _quorum;
- protected String _port;
-
- public Connector(final TableConfig conf, String _quorum, String _port) throws IOException {
- this.tableConf = conf;
- this._quorum = _quorum;
- this._port = _port;
- }
- public abstract void put(Put put) throws IOException;
- public abstract void close();
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java
deleted file mode 100644
index 6caa016..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.hbase;
-
-
-
-import java.io.IOException;
-import java.util.Map;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.log4j.Logger;
-import org.json.simple.JSONObject;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import org.apache.metron.helpers.topology.ErrorUtils;
-
-/**
- * A Storm bolt for putting data into HBase.
- * <p>
- * By default works in batch mode by enabling HBase's client-side write buffer. Enabling batch mode
- * is recommended for high throughput, but it can be disabled in {@link TupleTableConfig}.
- * <p>
- * The HBase configuration is picked up from the first <tt>hbase-site.xml</tt> encountered in the
- * classpath
- * @see TupleTableConfig
- * @see HTableConnector
- */
-@SuppressWarnings("serial")
-public class HBaseBolt implements IRichBolt {
- private static final Logger LOG = Logger.getLogger(HBaseBolt.class);
- private static final String DEFAULT_ZK_PORT = "2181";
-
- protected OutputCollector collector;
- protected TupleTableConfig conf;
- protected boolean autoAck = true;
- protected Connector connector;
- private String _quorum;
- private String _port;
-
- public HBaseBolt(TupleTableConfig conf, String quorum, String port) {
- this.conf = conf;
- _quorum = quorum;
- _port = port;
- }
- public HBaseBolt(final TupleTableConfig conf, String zkConnectString) throws IOException {
- this(conf, zkConnectStringToHosts(zkConnectString), zkConnectStringToPort(zkConnectString));
- }
- public static String zkConnectStringToHosts(String connString) {
- Iterable<String> hostPortPairs = Splitter.on(',').split(connString);
- return Joiner.on(',').join(Iterables.transform(hostPortPairs, new Function<String, String>() {
-
- @Override
- public String apply(String hostPortPair) {
- return Iterables.getFirst(Splitter.on(':').split(hostPortPair), "");
- }
- }));
- }
- public static String zkConnectStringToPort(String connString) {
- String hostPortPair = Iterables.getFirst(Splitter.on(",").split(connString), "");
- return Iterables.getLast(Splitter.on(":").split(hostPortPair),DEFAULT_ZK_PORT);
- }
-
-
- public Connector createConnector() throws IOException{
- initialize();
- return new HTableConnector(conf, _quorum, _port);
- }
-
- public void initialize() {
- TupleTableConfig hbaseBoltConfig = conf;
- String allColumnFamiliesColumnQualifiers = conf.getFields();
- String[] tokenizedColumnFamiliesWithColumnQualifiers = StringUtils
- .split(allColumnFamiliesColumnQualifiers, "\\|");
- for (String tokenizedColumnFamilyWithColumnQualifiers : tokenizedColumnFamiliesWithColumnQualifiers) {
- String[] cfCqTokens = StringUtils.split( tokenizedColumnFamilyWithColumnQualifiers, ":");
- String columnFamily = cfCqTokens[0];
- String[] columnQualifiers = StringUtils.split(cfCqTokens[1], ",");
- for (String columnQualifier : columnQualifiers) {
- hbaseBoltConfig.addColumn(columnFamily, columnQualifier);
- }
- setAutoAck(true);
- }
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("rawtypes")
-
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
-
- try {
- if(connector == null) {
- this.connector = createConnector();
- }
-
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- LOG.info("Preparing HBaseBolt for table: " + this.conf.getTableName());
- }
-
- /** {@inheritDoc} */
-
- public void execute(Tuple input) {
- try {
- Put p = conf.getPutFromTuple(input);
- this.connector.put(p);
- } catch (IOException ex) {
-
- JSONObject error = ErrorUtils.generateErrorMessage(
- "Alerts problem: " + input.toString(), ex);
- collector.emit("error", new Values(error));
-
- throw new RuntimeException(ex);
- }
-
- if (this.autoAck) {
- this.collector.ack(input);
- }
- }
-
- /** {@inheritDoc} */
-
- public void cleanup() {
- this.connector.close();
- }
-
- /** {@inheritDoc} */
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declareStream("error", new Fields("HBase"));
- }
-
- /** {@inheritDoc} */
-
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
- /**
- * @return the autoAck
- */
- public boolean isAutoAck() {
- return autoAck;
- }
-
- /**
- * @param autoAck the autoAck to set
- */
- public void setAutoAck(boolean autoAck) {
- this.autoAck = autoAck;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseStreamPartitioner.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseStreamPartitioner.java
deleted file mode 100644
index 519f76c..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseStreamPartitioner.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.hbase;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.HTable;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.task.WorkerTopologyContext;
-
-public class HBaseStreamPartitioner implements CustomStreamGrouping {
-
- private static final long serialVersionUID = -148324019395976092L;
- private String[] regionStartKeys = { "0" };
- private Map<String, String> regionStartKeyRegionNameMap = new HashMap<String, String>();
-
- private List<Integer> targetTasks = null;
- private int targetTasksSize = 0;
- private int rowKeyFieldIndex = 0;
- private String tableName = null;
- private long regionCheckTime = 0;
- private int regionInforRefreshIntervalInMins = 60;
- private int regionInforRefreshIntervalInMillis = regionInforRefreshIntervalInMins * 60000;
-
- HTable hTable = null;;
-
-
- public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
-
- System.out.println("preparing HBaseStreamPartitioner for streamId " + stream.get_streamId());
- this.targetTasks = targetTasks;
- this.targetTasksSize = this.targetTasks.size();
-
- Configuration conf = HBaseConfiguration.create();
- try {
- hTable = new HTable(conf, tableName);
- refreshRegionInfo(tableName);
-
- System.out.println("regionStartKeyRegionNameMap: " + regionStartKeyRegionNameMap);
-
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- }
-
- public void prepare() {
-
- System.out.println("preparing HBaseStreamPartitioner for streamId " );
-
- Configuration conf = HBaseConfiguration.create();
- try {
- hTable = new HTable(conf, tableName);
- refreshRegionInfo(tableName);
-
- System.out.println("regionStartKeyRegionNameMap: " + regionStartKeyRegionNameMap);
-
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- }
-
- public HBaseStreamPartitioner(String tableName, int rowKeyFieldIndex, int regionInforRefreshIntervalInMins) {
- System.out.println("Created HBaseStreamPartitioner ");
- this.rowKeyFieldIndex = rowKeyFieldIndex;
- this.tableName = tableName;
- this.regionInforRefreshIntervalInMins = regionInforRefreshIntervalInMins;
- this.regionInforRefreshIntervalInMillis = regionInforRefreshIntervalInMins * 60000;
-
- }
-
-
- public List<Integer> chooseTasks(int taskId, List<Object> values) {
- List<Integer> choosenTasks = null;
- System.out.println("Choosing task for taskId " + taskId + " and values " + values);
-
- if (regionInforRefreshIntervalInMillis > (System.currentTimeMillis() - regionCheckTime)) {
- try {
- refreshRegionInfo(tableName);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- int regionIndex = getRegionIndex((String) values.get(rowKeyFieldIndex));
-
- if (regionIndex < targetTasksSize) {
- choosenTasks = Arrays.asList(regionIndex);
-
- } else {
- choosenTasks = Arrays.asList(regionIndex % targetTasksSize);
- }
- System.out.println("Choosen tasks are " + choosenTasks);
-
- return choosenTasks;
-
-
- }
-
-
-
- public int getRegionIndex(String key) {
- int index = Arrays.binarySearch(regionStartKeys, key);
- if (index < -1) {
- index = (index + 2) * -1;
- } else if (index == -1) {
- index = 0;
- }
-
- return index;
- }
-
- private void refreshRegionInfo(String tableName) throws IOException {
-
- System.out.println("in refreshRegionInfo ");
-
- Map<HRegionInfo, ServerName> regionMap = hTable.getRegionLocations();
-
- synchronized (regionStartKeys) {
- synchronized (regionStartKeyRegionNameMap) {
- regionStartKeys = new String[regionMap.size()];
- int index = 0;
- String startKey = null;
- regionStartKeyRegionNameMap.clear();
- for (HRegionInfo regionInfo : regionMap.keySet()) {
- startKey = new String(regionInfo.getStartKey());
- regionStartKeyRegionNameMap.put(startKey, regionInfo.getRegionNameAsString());
- regionStartKeys[index] = startKey;
- index++;
- }
-
- Arrays.sort(regionStartKeys);
- regionCheckTime = System.currentTimeMillis();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java
deleted file mode 100644
index d1a9327..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.hbase;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.io.Serializable;
-import java.lang.reflect.InvocationTargetException;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.log4j.Logger;
-
-import backtype.storm.generated.Bolt;
-
-import javax.annotation.Nullable;
-
-/**
- * HTable connector for Storm {@link Bolt}
- * <p>
- * The HBase configuration is picked up from the first <tt>hbase-site.xml</tt> encountered in the
- * classpath
- */
-@SuppressWarnings("serial")
-public class HTableConnector extends Connector implements Serializable{
- private static final Logger LOG = Logger.getLogger(HTableConnector.class);
- private Configuration conf;
- protected HTableInterface table;
- private String tableName;
- private String connectorImpl;
-
-
- /**
- * Initialize HTable connection
- * @param conf The {@link TupleTableConfig}
- * @throws IOException
- */
- public HTableConnector(final TableConfig conf, String _quorum, String _port) throws IOException {
- super(conf, _quorum, _port);
- this.connectorImpl = conf.getConnectorImpl();
- this.tableName = conf.getTableName();
- this.conf = HBaseConfiguration.create();
-
- if(_quorum != null && _port != null)
- {
- this.conf.set("hbase.zookeeper.quorum", _quorum);
- this.conf.set("hbase.zookeeper.property.clientPort", _port);
- }
-
- LOG.info(String.format("Initializing connection to HBase table %s at %s", tableName,
- this.conf.get("hbase.rootdir")));
-
- try {
- this.table = getTableProvider().getTable(this.conf, this.tableName);
- } catch (IOException ex) {
- throw new IOException("Unable to establish connection to HBase table " + this.tableName, ex);
- }
-
- if (conf.isBatch()) {
- // Enable client-side write buffer
- this.table.setAutoFlush(false, true);
- LOG.info("Enabled client-side write buffer");
- }
-
- // If set, override write buffer size
- if (conf.getWriteBufferSize() > 0) {
- try {
- this.table.setWriteBufferSize(conf.getWriteBufferSize());
-
- LOG.info("Setting client-side write buffer to " + conf.getWriteBufferSize());
- } catch (IOException ex) {
- LOG.error("Unable to set client-side write buffer size for HBase table " + this.tableName,
- ex);
- }
- }
-
- // Check the configured column families exist
- for (String cf : conf.getColumnFamilies()) {
- if (!columnFamilyExists(cf)) {
- throw new RuntimeException(String.format(
- "HBase table '%s' does not have column family '%s'", conf.getTableName(), cf));
- }
- }
- }
-
- protected TableProvider getTableProvider() throws IOException {
- if(connectorImpl == null || connectorImpl.length() == 0 || connectorImpl.charAt(0) == '$') {
- return new HTableProvider();
- }
- else {
- try {
- Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(connectorImpl);
- return clazz.getConstructor().newInstance();
- } catch (InstantiationException e) {
- throw new IOException("Unable to instantiate connector.", e);
- } catch (IllegalAccessException e) {
- throw new IOException("Unable to instantiate connector: illegal access", e);
- } catch (InvocationTargetException e) {
- throw new IOException("Unable to instantiate connector", e);
- } catch (NoSuchMethodException e) {
- throw new IOException("Unable to instantiate connector: no such method", e);
- } catch (ClassNotFoundException e) {
- throw new IOException("Unable to instantiate connector: class not found", e);
- }
- }
- }
-
- /**
- * Checks to see if table contains the given column family
- * @param columnFamily The column family name
- * @return boolean
- * @throws IOException
- */
- private boolean columnFamilyExists(final String columnFamily) throws IOException {
- return this.table.getTableDescriptor().hasFamily(Bytes.toBytes(columnFamily));
- }
-
- /**
- * @return the table
- */
- public HTableInterface getTable() {
- return table;
- }
-
- @Override
- public void put(Put put) throws IOException {
- table.put(put);
- }
-
- /**
- * Close the table
- */
- @Override
- public void close() {
- try {
- this.table.close();
- } catch (IOException ex) {
- LOG.error("Unable to close connection to HBase table " + tableName, ex);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableProvider.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableProvider.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableProvider.java
deleted file mode 100644
index e454f04..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableProvider.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.hbase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
-
-import java.io.IOException;
-
-public class HTableProvider implements TableProvider {
- @Override
- public HTableInterface getTable(Configuration config, String tableName) throws IOException {
- return new HTable(config, tableName);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableConfig.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableConfig.java
deleted file mode 100644
index de2e929..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableConfig.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.hbase;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-
-public class TableConfig implements Serializable {
- static final long serialVersionUID = -1L;
- private String tableName;
- private boolean batch = true;
- protected Map<String, Set<String>> columnFamilies = new HashMap<>();
- private long writeBufferSize = 0L;
- private String connectorImpl;
-
- public TableConfig() {
-
- }
-
- public TableConfig(String tableName) {
- this.tableName = tableName;
- }
-
- public String getTableName() {
- return tableName;
- }
-
- public TableConfig withConnectorImpl(String impl) {
- connectorImpl = impl;
- return this;
- }
-
- public TableConfig withTable(String table) {
- this.tableName = table;
- return this;
- }
-
- public TableConfig withBatch(Boolean isBatch) {
- this.batch = isBatch;
- return this;
- }
-
- public String getConnectorImpl() {
- return connectorImpl;
- }
-
- /**
- * @return Whether batch mode is enabled
- */
- public boolean isBatch() {
- return batch;
- }
-
- /**
- * @param batch
- * Whether to enable HBase's client-side write buffer.
- * <p>
- * When enabled your bolt will store put operations locally until the
- * write buffer is full, so they can be sent to HBase in a single RPC
- * call. When disabled each put operation is effectively an RPC and
- * is sent straight to HBase. As your bolt can process thousands of
- * values per second it is recommended that the write buffer is
- * enabled.
- * <p>
- * Enabled by default
- */
- public void setBatch(boolean batch) {
- this.batch = batch;
- }
- /**
- * @param writeBufferSize
- * Overrides the client-side write buffer size.
- * <p>
- * By default the write buffer size is 2 MB (2097152 bytes). If you
- * are storing larger data, you may want to consider increasing this
- * value to allow your bolt to efficiently group together a larger
- * number of records per RPC
- * <p>
- * Overrides the write buffer size you have set in your
- * hbase-site.xml e.g. <code>hbase.client.write.buffer</code>
- */
- public void setWriteBufferSize(long writeBufferSize) {
- this.writeBufferSize = writeBufferSize;
- }
-
- /**
- * @return the writeBufferSize
- */
- public long getWriteBufferSize() {
- return writeBufferSize;
- }
- /**
- * @return A Set of configured column families
- */
- public Set<String> getColumnFamilies() {
- return this.columnFamilies.keySet();
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableProvider.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableProvider.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableProvider.java
deleted file mode 100644
index dc0569e..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableProvider.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.hbase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HTableInterface;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-public interface TableProvider extends Serializable {
- HTableInterface getTable(Configuration config, String tableName) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TupleTableConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TupleTableConfig.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TupleTableConfig.java
deleted file mode 100644
index a9ec20a..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TupleTableConfig.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.hbase;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Set;
-import java.util.TreeMap;
-
-import com.google.common.base.Joiner;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import backtype.storm.tuple.Tuple;
-import org.apache.log4j.Logger;
-
-/**
- * Configuration for Storm {@link Tuple} to HBase serialization.
- */
-@SuppressWarnings("serial")
-public class TupleTableConfig extends TableConfig implements Serializable {
- private static final Logger LOG = Logger.getLogger(TupleTableConfig.class);
- static final long serialVersionUID = -1L;
- public static final long DEFAULT_INCREMENT = 1L;
-
- protected String tupleRowKeyField;
- protected String tupleTimestampField;
- protected Durability durability = Durability.USE_DEFAULT;
- private String fields;
-
- /**
- * Initialize configuration
- *
- * @param table
- * The HBase table name
- * @param rowKeyField
- * The {@link Tuple} field used to set the rowKey
- */
- public TupleTableConfig(final String table, final String rowKeyField) {
- super(table);
- this.tupleRowKeyField = rowKeyField;
- this.tupleTimestampField = "";
- this.columnFamilies = new HashMap<String, Set<String>>();
- }
-
- /**
- * Initialize configuration
- *
- * @param table
- * The HBase table name
- * @param rowKeyField
- * The {@link Tuple} field used to set the rowKey
- * @param timestampField
- * The {@link Tuple} field used to set the timestamp
- */
- public TupleTableConfig(final String table, final String rowKeyField, final String timestampField) {
- super(table);
- this.tupleRowKeyField = rowKeyField;
- this.tupleTimestampField = timestampField;
- this.columnFamilies = new HashMap<String, Set<String>>();
- }
-
- public TupleTableConfig() {
- super(null);
- this.columnFamilies = new HashMap<String, Set<String>>();
- }
-
-
-
- public TupleTableConfig withRowKeyField(String rowKeyField) {
- this.tupleRowKeyField = rowKeyField;
- return this;
- }
-
- public TupleTableConfig withTimestampField(String timestampField) {
- this.tupleTimestampField = timestampField;
- return this;
- }
-
- public TupleTableConfig withFields(String fields) {
- this.fields = fields;
- return this;
- }
-
-
-
- public String getFields() {
- return fields;
- }
-
-
-
- /**
- * Add column family and column qualifier to be extracted from tuple
- *
- * @param columnFamily
- * The column family name
- * @param columnQualifier
- * The column qualifier name
- */
- public void addColumn(final String columnFamily, final String columnQualifier) {
- Set<String> columns = this.columnFamilies.get(columnFamily);
-
- if (columns == null) {
- columns = new HashSet<String>();
- }
- columns.add(columnQualifier);
-
- this.columnFamilies.put(columnFamily, columns);
- }
-
- /**
- * Creates a HBase {@link Put} from a Storm {@link Tuple}
- *
- * @param tuple
- * The {@link Tuple}
- * @return {@link Put}
- */
- public Put getPutFromTuple(final Tuple tuple) throws IOException{
- byte[] rowKey = null;
- try {
- rowKey = Bytes.toBytes(tuple.getStringByField(tupleRowKeyField));
- }
- catch(IllegalArgumentException iae) {
- throw new IOException("Unable to retrieve " + tupleRowKeyField + " from " + tuple + " [ " + Joiner.on(',').join(tuple.getFields()) + " ]", iae);
- }
-
- long ts = 0;
- if (!tupleTimestampField.equals("")) {
- ts = tuple.getLongByField(tupleTimestampField);
- }
-
- Put p = new Put(rowKey);
-
- p.setDurability(durability);
-
- if (columnFamilies.size() > 0) {
- for (String cf : columnFamilies.keySet()) {
- byte[] cfBytes = Bytes.toBytes(cf);
- for (String cq : columnFamilies.get(cf)) {
- byte[] cqBytes = Bytes.toBytes(cq);
- byte[] val = tuple.getBinaryByField(cq);
-
- if (ts > 0) {
- p.add(cfBytes, cqBytes, ts, val);
- } else {
- p.add(cfBytes, cqBytes, val);
- }
- }
- }
- }
-
- return p;
- }
-
- /**
- * Creates a HBase {@link Increment} from a Storm {@link Tuple}
- *
- * @param tuple
- * The {@link Tuple}
- * @param increment
- * The amount to increment the counter by
- * @return {@link Increment}
- */
- public Increment getIncrementFromTuple(final Tuple tuple, final long increment) {
- byte[] rowKey = Bytes.toBytes(tuple.getStringByField(tupleRowKeyField));
-
- Increment inc = new Increment(rowKey);
- inc.setDurability(durability);
-
- if (columnFamilies.size() > 0) {
- for (String cf : columnFamilies.keySet()) {
- byte[] cfBytes = Bytes.toBytes(cf);
- for (String cq : columnFamilies.get(cf)) {
- byte[] val;
- try {
- val = Bytes.toBytes(tuple.getStringByField(cq));
- } catch (IllegalArgumentException ex) {
- // if cq isn't a tuple field, use cq for counter instead of tuple
- // value
- val = Bytes.toBytes(cq);
- }
- inc.addColumn(cfBytes, val, increment);
- }
- }
- }
-
- return inc;
- }
-
- /**
- * Increment the counter for the given family and column by the specified
- * amount
- * <p>
- * If the family and column already exist in the Increment the counter value
- * is incremented by the specified amount rather than overridden, as it is in
- * HBase's {@link Increment#addColumn(byte[], byte[], long)} method
- *
- * @param inc
- * The {@link Increment} to update
- * @param family
- * The column family
- * @param qualifier
- * The column qualifier
- * @param amount
- * The amount to increment the counter by
- */
- public static void addIncrement(Increment inc, final byte[] family, final byte[] qualifier, final Long amount) {
-
- NavigableMap<byte[], Long> set = inc.getFamilyMapOfLongs().get(family);
- if (set == null) {
- set = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
- }
-
- // If qualifier exists, increment amount
- Long counter = set.get(qualifier);
- if (counter == null) {
- counter = 0L;
- }
- set.put(qualifier, amount + counter);
-
- inc.getFamilyMapOfLongs().put(family, set);
- }
-
-
-
- /**
- * @param durability
- * Sets whether to write to HBase's edit log.
- * <p>
- * Setting to false will mean fewer operations to perform when
- * writing to HBase and hence better performance, but changes that
- * haven't been flushed to a store file will be lost in the event of
- * HBase failure
- * <p>
- * Enabled by default
- */
- public void setDurability(Durability durability) {
- this.durability = durability;
- }
-
-
- public Durability getDurability() {
- return durability;
- }
-
-
-
- /**
- * @return the tupleRowKeyField
- */
- public String getTupleRowKeyField() {
- return tupleRowKeyField;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/AbstractConverter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/AbstractConverter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/AbstractConverter.java
deleted file mode 100644
index c58dc22..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/AbstractConverter.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.hbase.converters;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.metron.reference.lookup.LookupKV;
-import org.apache.metron.reference.lookup.LookupKey;
-import org.apache.metron.reference.lookup.LookupValue;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.*;
-
-
-public abstract class AbstractConverter<KEY_T extends LookupKey, VALUE_T extends LookupValue> implements HbaseConverter<KEY_T,VALUE_T> {
- public static Function<Cell, Map.Entry<byte[], byte[]>> CELL_TO_ENTRY = new Function<Cell, Map.Entry<byte[], byte[]>>() {
-
- @Nullable
- @Override
- public Map.Entry<byte[], byte[]> apply(@Nullable Cell cell) {
- return new AbstractMap.SimpleEntry<>(cell.getQualifier(), cell.getValue());
- }
- };
- @Override
- public Put toPut(String columnFamily, KEY_T key, VALUE_T values) throws IOException {
- Put put = new Put(key.toBytes());
- byte[] cf = Bytes.toBytes(columnFamily);
- for(Map.Entry<byte[], byte[]> kv : values.toColumns()) {
- put.add(cf, kv.getKey(), kv.getValue());
- }
- return put;
- }
-
- public LookupKV<KEY_T, VALUE_T> fromPut(Put put, String columnFamily, KEY_T key, VALUE_T value) throws IOException {
- key.fromBytes(put.getRow());
- byte[] cf = Bytes.toBytes(columnFamily);
- value.fromColumns(Iterables.transform(put.getFamilyCellMap().get(cf), CELL_TO_ENTRY));
- return new LookupKV<>(key, value);
- }
-
- @Override
- public Result toResult(String columnFamily, KEY_T key, VALUE_T values) throws IOException {
- Put put = toPut(columnFamily, key, values);
- return Result.create(put.getFamilyCellMap().get(Bytes.toBytes(columnFamily)));
- }
-
- public LookupKV<KEY_T, VALUE_T> fromResult(Result result, String columnFamily, KEY_T key, VALUE_T value) throws IOException {
- if(result == null || result.getRow() == null) {
- return null;
- }
- key.fromBytes(result.getRow());
- byte[] cf = Bytes.toBytes(columnFamily);
- NavigableMap<byte[], byte[]> cols = result.getFamilyMap(cf);
- value.fromColumns(cols.entrySet());
- return new LookupKV<>(key, value);
- }
- @Override
- public Get toGet(String columnFamily, KEY_T key) {
- Get ret = new Get(key.toBytes());
- ret.addFamily(Bytes.toBytes(columnFamily));
- return ret;
- }
-
- public static Iterable<Map.Entry<byte[], byte[]>> toEntries(byte[]... kvs) {
- if(kvs.length % 2 != 0) {
- throw new IllegalStateException("Must be an even size");
- }
- List<Map.Entry<byte[], byte[]>> ret = new ArrayList<>(kvs.length/2);
- for(int i = 0;i < kvs.length;i += 2) {
- ret.add(new AbstractMap.SimpleImmutableEntry<>(kvs[i], kvs[i+1])) ;
- }
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/HbaseConverter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/HbaseConverter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/HbaseConverter.java
deleted file mode 100644
index 449d9cf..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/HbaseConverter.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.hbase.converters;
-
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.metron.reference.lookup.LookupKV;
-import org.apache.metron.reference.lookup.LookupKey;
-import org.apache.metron.reference.lookup.LookupValue;
-
-import java.io.IOException;
-
-public interface HbaseConverter<KEY_T extends LookupKey, VALUE_T extends LookupValue> {
- Put toPut(String columnFamily, KEY_T key, VALUE_T values) throws IOException;
-
- LookupKV<KEY_T, VALUE_T> fromPut(Put put, String columnFamily) throws IOException;
-
- Result toResult(String columnFamily, KEY_T key, VALUE_T values) throws IOException;
-
- LookupKV<KEY_T, VALUE_T> fromResult(Result result, String columnFamily) throws IOException;
-
- Get toGet(String columnFamily, KEY_T key);
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/enrichment/EnrichmentConverter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/enrichment/EnrichmentConverter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/enrichment/EnrichmentConverter.java
deleted file mode 100644
index a044498..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/enrichment/EnrichmentConverter.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.hbase.converters.enrichment;
-
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.metron.hbase.converters.AbstractConverter;
-import org.apache.metron.reference.lookup.LookupKV;
-
-import java.io.IOException;
-
-public class EnrichmentConverter extends AbstractConverter<EnrichmentKey, EnrichmentValue> {
-
- @Override
- public LookupKV<EnrichmentKey, EnrichmentValue> fromPut(Put put, String columnFamily) throws IOException {
- return fromPut(put, columnFamily, new EnrichmentKey(), new EnrichmentValue());
- }
-
- @Override
- public LookupKV<EnrichmentKey, EnrichmentValue> fromResult(Result result, String columnFamily) throws IOException {
- return fromResult(result, columnFamily, new EnrichmentKey(), new EnrichmentValue());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/enrichment/EnrichmentHelper.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/enrichment/EnrichmentHelper.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/enrichment/EnrichmentHelper.java
deleted file mode 100644
index a3d1b66..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/enrichment/EnrichmentHelper.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.hbase.converters.enrichment;
-
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.metron.reference.lookup.LookupKV;
-
-import java.io.IOException;
-
-public enum EnrichmentHelper {
- INSTANCE;
- EnrichmentConverter converter = new EnrichmentConverter();
-
- public void load(HTableInterface table, String cf, Iterable<LookupKV<EnrichmentKey, EnrichmentValue>> results) throws IOException {
- for(LookupKV<EnrichmentKey, EnrichmentValue> result : results) {
- Put put = converter.toPut(cf, result.getKey(), result.getValue());
- table.put(put);
- }
- }
-}