You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2016/04/26 16:45:54 UTC

[06/51] [partial] incubator-metron git commit: METRON-113 Project Reorganization (merrimanr) closes apache/incubator-metron#88

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/SplitBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/SplitBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/SplitBolt.java
deleted file mode 100644
index 89e13a4..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/SplitBolt.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.bolt;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import java.util.Map;
-import java.util.Set;
-
-public abstract class SplitBolt<T> extends
-        ConfiguredBolt {
-
-  protected OutputCollector collector;
-
-  public SplitBolt(String zookeeperUrl) {
-    super(zookeeperUrl);
-  }
-
-  @Override
-  public final void prepare(Map map, TopologyContext topologyContext,
-                       OutputCollector outputCollector) {
-    super.prepare(map, topologyContext, outputCollector);
-    collector = outputCollector;
-    prepare(map, topologyContext);
-  }
-
-  @Override
-  public final void execute(Tuple tuple) {
-    emit(tuple, generateMessage(tuple));
-  }
-
-  @Override
-  public final void declareOutputFields(OutputFieldsDeclarer declarer) {
-    declarer.declareStream("message", new Fields("key", "message"));
-    for (String streamId : getStreamIds()) {
-      declarer.declareStream(streamId, new Fields("key", "message"));
-    }
-    declarer.declareStream("error", new Fields("message"));
-    declareOther(declarer);
-  }
-
-  public void emit(Tuple tuple, T message) {
-    if (message == null) return;
-    String key = getKey(tuple, message);
-    collector.emit("message", tuple, new Values(key, message));
-    Map<String, T> streamMessageMap = splitMessage(message);
-    for (String streamId : streamMessageMap.keySet()) {
-      T streamMessage = streamMessageMap.get(streamId);
-      if (streamMessage == null) {
-        streamMessage = getDefaultMessage(streamId);
-      }
-      collector.emit(streamId, new Values(key, streamMessage));
-    }
-    collector.ack(tuple);
-    emitOther(tuple, message);
-  }
-
-  protected T getDefaultMessage(String streamId) {
-    throw new IllegalArgumentException("Could not find a message for" +
-            " stream: " + streamId);
-  }
-
-  public abstract void prepare(Map map, TopologyContext topologyContext);
-
-  public abstract Set<String> getStreamIds();
-
-  public abstract String getKey(Tuple tuple, T message);
-
-  public abstract T generateMessage(Tuple tuple);
-
-  public abstract Map<String, T> splitMessage(T message);
-
-  public abstract void declareOther(OutputFieldsDeclarer declarer);
-
-  public abstract void emitOther(Tuple tuple, T message);
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/configuration/ConfigurationManager.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/configuration/ConfigurationManager.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/configuration/ConfigurationManager.java
deleted file mode 100644
index 0989150..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/configuration/ConfigurationManager.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.configuration;
-
-
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.configuration.CombinedConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.DefaultConfigurationBuilder;
-import org.apache.commons.lang.StringUtils;
-import org.apache.log4j.Logger;
-
-/**
- * Configuration manager class which loads all 'config-definition.xml' files and
- * creates a Configuration object which holds all properties from the underlying
- * configuration resource
- */
-public class ConfigurationManager {
-
-  /** configuration definition file name. */
-  private static String DEFAULT_CONFIG_DEFINITION_FILE_NAME = "config-definition.xml";
-
-  /** Stores a map with the configuration for each path specified. */
-  private static Map<String, Configuration> configurationsCache = new HashMap<String, Configuration>();
-
-  /** The Constant LOGGER. */
-  private static final Logger LOGGER = Logger
-      .getLogger(ConfigurationManager.class);
-
-  /**
-   * Common method to load content of all configuration resources defined in
-   * 'config-definition.xml'.
-   * 
-   * @param configDefFilePath
-   *          the config def file path
-   * @return Configuration
-   */
-  public static Configuration getConfiguration(String configDefFilePath) {
-    if (configurationsCache.containsKey(configDefFilePath)) {
-      return configurationsCache.get(configDefFilePath);
-    }
-    CombinedConfiguration configuration = null;
-    synchronized (configurationsCache) {
-      if (configurationsCache.containsKey(configDefFilePath)) {
-        return configurationsCache.get(configDefFilePath);
-      }
-      DefaultConfigurationBuilder builder = new DefaultConfigurationBuilder();
-      String fielPath = getConfigDefFilePath(configDefFilePath);
-      LOGGER.info("loading from 'configDefFilePath' :" + fielPath);
-      builder.setFile(new File(fielPath));
-      try {
-        configuration = builder.getConfiguration(true);
-        configurationsCache.put(fielPath, configuration);
-      } catch (ConfigurationException e) {
-        LOGGER.info("Exception in loading property files.", e);
-      }
-    }
-    return configuration;
-  }
-
-  /**
-   * Removes the configuration created from a config definition file located at
-   * 'configDefFilePath'.
-   * 
-   * @param configDefFilePath
-   *          path to the config definition file
-   */
-  public static void clearConfiguration(String configDefFilePath) {
-    configurationsCache.remove(configDefFilePath);
-  }
-
-  /**
-   * Gets the configuration.
-   * 
-   * @return the configuration
-   */
-  public static Configuration getConfiguration() {
-    return getConfiguration(null);
-  }
-
-  /**
-   * Returns the 'config-definition.xml' file path. 1. If the param
-   * 'configDefFilePath' has a valid value, returns configDefFilePath 2. If the
-   * system property key 'configDefFilePath' has a valid value, returns the
-   * value 3. By default, it returns the file name 'config-definition.xml'
-   * 
-   * @param configDefFilePath
-   *          given input path to the config definition file
-   * @return the config def file path
-   */
-  private static String getConfigDefFilePath(String configDefFilePath) {
-    if (StringUtils.isNotEmpty(configDefFilePath)) {
-      return configDefFilePath;
-    }
-    return DEFAULT_CONFIG_DEFINITION_FILE_NAME;
-  }
-
-  /**
-   * The main method.
-   * 
-   * @param args
-   *          the args
-   * @throws InterruptedException
-   *           the interrupted exception
-   */
-  public static void main(String[] args) throws InterruptedException {
-    Configuration config = ConfigurationManager
-        .getConfiguration("/Users/Sayi/Documents/config/config-definition-dpi.xml");
-    System.out.println("elastic.search.cluster ="
-        + config.getString("elastic.search.cluster"));
-    Thread.sleep(10000);
-    System.out.println("storm.topology.dpi.bolt.es-index.index.name ="
-        + config.getString("storm.topology.dpi.bolt.es-index.index.name"));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/dataloads/interfaces/ThreatIntelSource.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/dataloads/interfaces/ThreatIntelSource.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/dataloads/interfaces/ThreatIntelSource.java
deleted file mode 100644
index 4e87a1c..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/dataloads/interfaces/ThreatIntelSource.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.dataloads.interfaces;
-
-import java.util.Iterator;
-import org.apache.commons.configuration.Configuration;
-import org.json.simple.JSONObject;
-
-public interface ThreatIntelSource extends Iterator<JSONObject> {
-
-	void initializeSource(Configuration config);
-	void cleanupSource();
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configuration.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configuration.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configuration.java
deleted file mode 100644
index d21c686..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configuration.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.domain;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.metron.utils.ConfigurationsUtils;
-
-import java.nio.file.Path;
-import java.util.Map;
-
-public class Configuration extends Configurations {
-
-    protected CuratorFramework curatorFramework = null;
-    private Path configFileRoot;
-
-    public Configuration(CuratorFramework curatorFramework){
-
-        this.curatorFramework = curatorFramework;
-
-    }
-
-
-    public Configuration(Path configFileRoot){
-
-        this.configFileRoot = configFileRoot;
-    }
-
-    public void update() throws Exception {
-
-        if( null != curatorFramework ) {
-
-            ConfigurationsUtils.updateConfigsFromZookeeper(this, this.curatorFramework);
-
-        } else {
-
-            updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile(configFileRoot.toAbsolutePath().toString()));
-            Map<String, byte[]> sensorEnrichmentConfigs = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(configFileRoot.toAbsolutePath().toString());
-            for(String sensorType: sensorEnrichmentConfigs.keySet()) {
-                updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfigs.get(sensorType));
-            }
-
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configurations.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configurations.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configurations.java
deleted file mode 100644
index 63e0f95..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configurations.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.domain;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import org.apache.log4j.Logger;
-import org.apache.metron.utils.JSONUtils;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-public class Configurations implements Serializable {
-
-  private static final Logger LOG = Logger.getLogger(Configurations.class);
-
-  public enum Type {
-    GLOBAL, SENSOR, OTHER
-  }
-
-  public static final String GLOBAL_CONFIG_NAME = "global";
-
-  private ConcurrentMap<String, Object> configurations = new ConcurrentHashMap<>();
-
-  @SuppressWarnings("unchecked")
-  public Map<String, Object> getGlobalConfig() {
-    return (Map<String, Object>) configurations.get(GLOBAL_CONFIG_NAME);
-  }
-
-  public void updateGlobalConfig(byte[] data) throws IOException {
-    updateGlobalConfig(new ByteArrayInputStream(data));
-  }
-
-  public void updateGlobalConfig(InputStream io) throws IOException {
-    Map<String, Object> globalConfig = JSONUtils.INSTANCE.load(io, new TypeReference<Map<String, Object>>() {
-    });
-    updateGlobalConfig(globalConfig);
-  }
-
-  public void updateGlobalConfig(Map<String, Object> globalConfig) {
-    configurations.put(GLOBAL_CONFIG_NAME, globalConfig);
-  }
-
-  public SensorEnrichmentConfig getSensorEnrichmentConfig(String sensorType) {
-    return (SensorEnrichmentConfig) configurations.get(sensorType);
-  }
-
-  public void updateSensorEnrichmentConfig(String sensorType, byte[] data) throws IOException {
-    updateSensorEnrichmentConfig(sensorType, new ByteArrayInputStream(data));
-  }
-
-  public void updateSensorEnrichmentConfig(String sensorType, InputStream io) throws IOException {
-    SensorEnrichmentConfig sensorEnrichmentConfig = JSONUtils.INSTANCE.load(io, SensorEnrichmentConfig.class);
-    updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfig);
-  }
-
-  public void updateSensorEnrichmentConfig(String sensorType, SensorEnrichmentConfig sensorEnrichmentConfig) {
-    configurations.put(sensorType, sensorEnrichmentConfig);
-  }
-
-  @SuppressWarnings("unchecked")
-  public Map<String, Object> getConfig(String name) {
-    return (Map<String, Object>) configurations.get(name);
-  }
-
-  public void updateConfig(String name, byte[] data) throws IOException {
-    if (data == null) throw new IllegalStateException("config data cannot be null");
-    Map<String, Object> config = JSONUtils.INSTANCE.load(new ByteArrayInputStream(data), new TypeReference<Map<String, Object>>() {});
-    updateConfig(name, config);
-  }
-
-  public void updateConfig(String name, Map<String, Object> config) {
-    configurations.put(name, config);
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-    Configurations that = (Configurations) o;
-    return configurations.equals(that.configurations);
-  }
-
-  @Override
-  public int hashCode() {
-    return configurations.hashCode();
-  }
-
-  @Override
-  public String toString() {
-    return configurations.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java
deleted file mode 100644
index 6f43739..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.domain;
-
-import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
-
-import java.io.Serializable;
-import java.util.List;
-
-public class Enrichment<T extends EnrichmentAdapter> implements Serializable {
-
-  private String type;
-  private List<String> fields;
-  private T adapter;
-
-  public Enrichment() {}
-
-  public Enrichment(String type, T adapter) {
-    this.type = type;
-    this.adapter = adapter;
-  }
-
-
-  public List<String> getFields() {
-    return fields;
-  }
-
-  public void setFields(List<String> fields) {
-    this.fields = fields;
-  }
-
-  public String getType() {
-    return type;
-  }
-
-  public void setType(String type) {
-    this.type = type;
-  }
-
-  public T getAdapter() {
-    return adapter;
-  }
-
-  public void setAdapter(T adapter) {
-    this.adapter = adapter;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SensorEnrichmentConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SensorEnrichmentConfig.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SensorEnrichmentConfig.java
deleted file mode 100644
index ea345ca..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SensorEnrichmentConfig.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.domain;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.metron.utils.JSONUtils;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class SensorEnrichmentConfig {
-
-  private String index;
-  private Map<String, List<String>> enrichmentFieldMap;
-  private Map<String, List<String>> threatIntelFieldMap;
-  private Map<String, List<String>> fieldToEnrichmentTypeMap = new HashMap<>();
-  private Map<String, List<String>> fieldToThreatIntelTypeMap = new HashMap<>();
-  private int batchSize;
-
-  public String getIndex() {
-    return index;
-  }
-
-  public void setIndex(String index) {
-    this.index = index;
-  }
-
-  public Map<String, List<String>> getEnrichmentFieldMap() {
-    return enrichmentFieldMap;
-  }
-
-  public void setEnrichmentFieldMap(Map<String, List<String>> enrichmentFieldMap) {
-    this.enrichmentFieldMap = enrichmentFieldMap;
-  }
-
-  public Map<String, List<String>> getThreatIntelFieldMap() {
-    return threatIntelFieldMap;
-  }
-
-  public void setThreatIntelFieldMap(Map<String, List<String>> threatIntelFieldMap) {
-    this.threatIntelFieldMap = threatIntelFieldMap;
-  }
-
-  public Map<String, List<String>> getFieldToEnrichmentTypeMap() {
-    return fieldToEnrichmentTypeMap;
-  }
-
-  public Map<String, List<String>> getFieldToThreatIntelTypeMap() {
-    return fieldToThreatIntelTypeMap;
-  }
-  public void setFieldToEnrichmentTypeMap(Map<String, List<String>> fieldToEnrichmentTypeMap) {
-    this.fieldToEnrichmentTypeMap = fieldToEnrichmentTypeMap;
-  }
-
-  public void setFieldToThreatIntelTypeMap(Map<String, List<String>> fieldToThreatIntelTypeMap) {
-    this.fieldToThreatIntelTypeMap= fieldToThreatIntelTypeMap;
-  }
-  public int getBatchSize() {
-    return batchSize;
-  }
-
-  public void setBatchSize(int batchSize) {
-    this.batchSize = batchSize;
-  }
-
-  public static SensorEnrichmentConfig fromBytes(byte[] config) throws IOException {
-    return JSONUtils.INSTANCE.load(new String(config), SensorEnrichmentConfig.class);
-  }
-
-  public String toJSON() throws JsonProcessingException {
-    return JSONUtils.INSTANCE.toJSON(this, true);
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-
-    SensorEnrichmentConfig that = (SensorEnrichmentConfig) o;
-
-    if (getBatchSize() != that.getBatchSize()) return false;
-    if (getIndex() != null ? !getIndex().equals(that.getIndex()) : that.getIndex() != null) return false;
-    if (getEnrichmentFieldMap() != null ? !getEnrichmentFieldMap().equals(that.getEnrichmentFieldMap()) : that.getEnrichmentFieldMap() != null)
-      return false;
-    if (getThreatIntelFieldMap() != null ? !getThreatIntelFieldMap().equals(that.getThreatIntelFieldMap()) : that.getThreatIntelFieldMap() != null)
-      return false;
-    if (getFieldToEnrichmentTypeMap() != null ? !getFieldToEnrichmentTypeMap().equals(that.getFieldToEnrichmentTypeMap()) : that.getFieldToEnrichmentTypeMap() != null)
-      return false;
-    return getFieldToThreatIntelTypeMap() != null ? getFieldToThreatIntelTypeMap().equals(that.getFieldToThreatIntelTypeMap()) : that.getFieldToThreatIntelTypeMap() == null;
-
-  }
-
-  @Override
-  public String toString() {
-    return "{index=" + index + ", batchSize=" + batchSize +
-            ", enrichmentFieldMap=" + enrichmentFieldMap +
-            ", threatIntelFieldMap" + threatIntelFieldMap +
-            ", fieldToEnrichmentTypeMap=" + fieldToEnrichmentTypeMap +
-            ", fieldToThreatIntelTypeMap=" + fieldToThreatIntelTypeMap + "}";
-  }
-
-  @Override
-  public int hashCode() {
-    int result = getIndex() != null ? getIndex().hashCode() : 0;
-    result = 31 * result + (getEnrichmentFieldMap() != null ? getEnrichmentFieldMap().hashCode() : 0);
-    result = 31 * result + (getThreatIntelFieldMap() != null ? getThreatIntelFieldMap().hashCode() : 0);
-    result = 31 * result + (getFieldToEnrichmentTypeMap() != null ? getFieldToEnrichmentTypeMap().hashCode() : 0);
-    result = 31 * result + (getFieldToThreatIntelTypeMap() != null ? getFieldToThreatIntelTypeMap().hashCode() : 0);
-    result = 31 * result + getBatchSize();
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentConfig.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentConfig.java
deleted file mode 100644
index 92913d9..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentConfig.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.enrichment;
-
-import com.google.common.base.Joiner;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.metron.Constants;
-import org.apache.metron.domain.SensorEnrichmentConfig;
-import org.apache.metron.utils.ConfigurationsUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-
-public class EnrichmentConfig {
-  public static enum Type {
-     THREAT_INTEL
-    ,ENRICHMENT
-  }
-
-  protected static final Logger _LOG = LoggerFactory.getLogger(EnrichmentConfig.class);
-  public static class FieldList {
-    Type type;
-    Map<String, List<String>> fieldToEnrichmentTypes;
-
-    public Type getType() {
-      return type;
-    }
-
-    public void setType(Type type) {
-      this.type = type;
-    }
-
-    public Map<String, List<String>> getFieldToEnrichmentTypes() {
-      return fieldToEnrichmentTypes;
-    }
-
-    public void setFieldToEnrichmentTypes(Map<String, List<String>> fieldToEnrichmentTypes) {
-      this.fieldToEnrichmentTypes = fieldToEnrichmentTypes;
-    }
-  }
-  public String zkQuorum;
-  public Map<String, FieldList> sensorToFieldList;
-
-  public String getZkQuorum() {
-    return zkQuorum;
-  }
-
-  public void setZkQuorum(String zkQuorum) {
-    this.zkQuorum = zkQuorum;
-  }
-
-  public Map<String, FieldList> getSensorToFieldList() {
-    return sensorToFieldList;
-  }
-
-  public void setSensorToFieldList(Map<String, FieldList> sensorToFieldList) {
-    this.sensorToFieldList = sensorToFieldList;
-  }
-
-  public void updateSensorConfigs( ) throws Exception {
-    CuratorFramework client = ConfigurationsUtils.getClient(getZkQuorum());
-    try {
-      client.start();
-      updateSensorConfigs(new ZKSourceConfigHandler(client), sensorToFieldList);
-    }
-    finally {
-      client.close();
-    }
-  }
-
-  public static interface SourceConfigHandler {
-    SensorEnrichmentConfig readConfig(String sensor) throws Exception;
-    void persistConfig(String sensor, SensorEnrichmentConfig config) throws Exception;
-  }
-
-  public static class ZKSourceConfigHandler implements SourceConfigHandler {
-    CuratorFramework client;
-    public ZKSourceConfigHandler(CuratorFramework client) {
-      this.client = client;
-    }
-    @Override
-    public SensorEnrichmentConfig readConfig(String sensor) throws Exception {
-      return SensorEnrichmentConfig.fromBytes(ConfigurationsUtils.readSensorEnrichmentConfigBytesFromZookeeper(sensor, client));
-    }
-
-    @Override
-    public void persistConfig(String sensor, SensorEnrichmentConfig config) throws Exception {
-      ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensor, config.toJSON().getBytes(), client);
-    }
-  }
-
-  public static void updateSensorConfigs( SourceConfigHandler scHandler
-                                        , Map<String, FieldList> sensorToFieldList
-                                        ) throws Exception
-  {
-    Map<String, SensorEnrichmentConfig> sourceConfigsChanged = new HashMap<>();
-    for (Map.Entry<String, FieldList> kv : sensorToFieldList.entrySet()) {
-      SensorEnrichmentConfig config = sourceConfigsChanged.get(kv.getKey());
-      if(config == null) {
-        config = scHandler.readConfig(kv.getKey());
-        if(_LOG.isDebugEnabled()) {
-          _LOG.debug(config.toJSON());
-        }
-      }
-      Map<String, List<String> > fieldMap = null;
-      Map<String, List<String> > fieldToTypeMap = null;
-      List<String> fieldList = null;
-      if(kv.getValue().type == Type.THREAT_INTEL) {
-        fieldMap = config.getThreatIntelFieldMap();
-        if(fieldMap!= null) {
-          fieldList = fieldMap.get(Constants.SIMPLE_HBASE_THREAT_INTEL);
-        }
-        if(fieldList == null) {
-          fieldList = new ArrayList<>();
-          fieldMap.put(Constants.SIMPLE_HBASE_THREAT_INTEL, fieldList);
-        }
-        fieldToTypeMap = config.getFieldToThreatIntelTypeMap();
-        if(fieldToTypeMap == null) {
-          fieldToTypeMap = new HashMap<>();
-          config.setFieldToThreatIntelTypeMap(fieldToTypeMap);
-        }
-      }
-      else if(kv.getValue().type == Type.ENRICHMENT) {
-        fieldMap = config.getEnrichmentFieldMap();
-        if(fieldMap!= null) {
-          fieldList = fieldMap.get(Constants.SIMPLE_HBASE_ENRICHMENT);
-        }
-        if(fieldList == null) {
-          fieldList = new ArrayList<>();
-          fieldMap.put(Constants.SIMPLE_HBASE_ENRICHMENT, fieldList);
-        }
-        fieldToTypeMap = config.getFieldToEnrichmentTypeMap();
-        if(fieldToTypeMap == null) {
-          fieldToTypeMap = new HashMap<>();
-          config.setFieldToEnrichmentTypeMap(fieldToTypeMap);
-        }
-      }
-      if(fieldToTypeMap == null  || fieldMap == null) {
-        _LOG.debug("fieldToTypeMap is null or fieldMap is null, so skipping");
-        continue;
-      }
-      //Add the additional fields to the field list associated with the hbase adapter
-      {
-        HashSet<String> fieldSet = new HashSet<>(fieldList);
-        List<String> additionalFields = new ArrayList<>();
-        for (String field : kv.getValue().getFieldToEnrichmentTypes().keySet()) {
-          if (!fieldSet.contains(field)) {
-            additionalFields.add(field);
-          }
-        }
-        //adding only the ones that we don't already have to the field list
-        if (additionalFields.size() > 0) {
-          _LOG.debug("Adding additional fields: " + Joiner.on(',').join(additionalFields));
-          fieldList.addAll(additionalFields);
-          sourceConfigsChanged.put(kv.getKey(), config);
-        }
-      }
-      //Add the additional enrichment types to the mapping between the fields
-      {
-        for(Map.Entry<String, List<String>> fieldToType : kv.getValue().getFieldToEnrichmentTypes().entrySet()) {
-          String field = fieldToType.getKey();
-          final HashSet<String> types = new HashSet<>(fieldToType.getValue());
-          int sizeBefore = 0;
-          if(fieldToTypeMap.containsKey(field)) {
-            List<String> typeList = fieldToTypeMap.get(field);
-            sizeBefore = new HashSet<>(typeList).size();
-            types.addAll(typeList);
-          }
-          int sizeAfter = types.size();
-          boolean changed = sizeBefore != sizeAfter;
-          if(changed) {
-            fieldToTypeMap.put(field, new ArrayList<String>() {{
-                addAll(types);
-              }});
-            sourceConfigsChanged.put(kv.getKey(), config);
-          }
-        }
-      }
-    }
-    for(Map.Entry<String, SensorEnrichmentConfig> kv : sourceConfigsChanged.entrySet()) {
-      scHandler.persistConfig(kv.getKey(), kv.getValue());
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentConstants.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentConstants.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentConstants.java
deleted file mode 100644
index 4f7be3b..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentConstants.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment;
-
-public class EnrichmentConstants {
-
-
-
-  public static final String INDEX_NAME = "index.name";
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
deleted file mode 100644
index 28f9956..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.enrichment.interfaces;
-
-import org.json.simple.JSONObject;
-
-public interface EnrichmentAdapter<T>
-{
-	void logAccess(T value);
-	JSONObject enrich(T value);
-	boolean initializeAdapter();
-	void cleanup();
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/Connector.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/Connector.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/Connector.java
deleted file mode 100644
index 35da040..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/Connector.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.hbase;
-
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.io.Serializable;
-
-/**
- * Created by cstella on 1/29/16.
- */
-public abstract class Connector {
-  protected TableConfig tableConf;
-  protected String _quorum;
-  protected String _port;
-
-  public Connector(final TableConfig conf, String _quorum, String _port) throws IOException {
-    this.tableConf = conf;
-    this._quorum = _quorum;
-    this._port = _port;
-  }
-  public abstract void put(Put put) throws IOException;
-  public abstract void close();
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java
deleted file mode 100644
index 6caa016..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.hbase;
-
-
-
-import java.io.IOException;
-import java.util.Map;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.log4j.Logger;
-import org.json.simple.JSONObject;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import org.apache.metron.helpers.topology.ErrorUtils;
-
-/**
- * A Storm bolt for putting data into HBase.
- * <p>
- * By default works in batch mode by enabling HBase's client-side write buffer. Enabling batch mode
- * is recommended for high throughput, but it can be disabled in {@link TupleTableConfig}.
- * <p>
- * The HBase configuration is picked up from the first <tt>hbase-site.xml</tt> encountered in the
- * classpath
- * @see TupleTableConfig
- * @see HTableConnector
- */
-@SuppressWarnings("serial")
-public class HBaseBolt implements IRichBolt {
-  private static final Logger LOG = Logger.getLogger(HBaseBolt.class);
-  private static final String DEFAULT_ZK_PORT = "2181";
-
-  protected OutputCollector collector;
-  protected TupleTableConfig conf;
-  protected boolean autoAck = true;
-  protected Connector connector;
-  private String _quorum;
-  private String _port;
-
-  public HBaseBolt(TupleTableConfig conf, String quorum, String port) {
-    this.conf = conf;
-    _quorum = quorum;
-    _port = port;
-  }
-  public HBaseBolt(final TupleTableConfig conf, String zkConnectString) throws IOException {
-    this(conf, zkConnectStringToHosts(zkConnectString), zkConnectStringToPort(zkConnectString));
-  }
-  public static String zkConnectStringToHosts(String connString) {
-    Iterable<String> hostPortPairs = Splitter.on(',').split(connString);
-    return Joiner.on(',').join(Iterables.transform(hostPortPairs, new Function<String, String>() {
-
-      @Override
-      public String apply(String hostPortPair) {
-        return Iterables.getFirst(Splitter.on(':').split(hostPortPair), "");
-      }
-    }));
-  }
-  public static String zkConnectStringToPort(String connString) {
-    String hostPortPair = Iterables.getFirst(Splitter.on(",").split(connString), "");
-    return Iterables.getLast(Splitter.on(":").split(hostPortPair),DEFAULT_ZK_PORT);
-  }
-
-
-  public Connector createConnector() throws IOException{
-    initialize();
-    return new HTableConnector(conf, _quorum, _port);
-  }
-
-  public void initialize() {
-    TupleTableConfig hbaseBoltConfig = conf;
-    String allColumnFamiliesColumnQualifiers = conf.getFields();
-    String[] tokenizedColumnFamiliesWithColumnQualifiers = StringUtils
-            .split(allColumnFamiliesColumnQualifiers, "\\|");
-    for (String tokenizedColumnFamilyWithColumnQualifiers : tokenizedColumnFamiliesWithColumnQualifiers) {
-      String[] cfCqTokens = StringUtils.split( tokenizedColumnFamilyWithColumnQualifiers, ":");
-      String columnFamily = cfCqTokens[0];
-      String[] columnQualifiers = StringUtils.split(cfCqTokens[1], ",");
-      for (String columnQualifier : columnQualifiers) {
-        hbaseBoltConfig.addColumn(columnFamily, columnQualifier);
-      }
-      setAutoAck(true);
-    }
-  }
-
-  /** {@inheritDoc} */
-  @SuppressWarnings("rawtypes")
-  
-  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-    this.collector = collector;
-
-    try {
-      if(connector == null) {
-        this.connector = createConnector();
-      }
-		
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-
-    LOG.info("Preparing HBaseBolt for table: " + this.conf.getTableName());
-  }
-
-  /** {@inheritDoc} */
-  
-  public void execute(Tuple input) {
-    try {
-      Put p = conf.getPutFromTuple(input);
-      this.connector.put(p);
-    } catch (IOException ex) {
-
-  		JSONObject error = ErrorUtils.generateErrorMessage(
-  				"Alerts problem: " + input.toString(), ex);
-  		collector.emit("error", new Values(error));
-  		
-      throw new RuntimeException(ex);
-    }
-
-    if (this.autoAck) {
-      this.collector.ack(input);
-    }
-  }
-
-  /** {@inheritDoc} */
-  
-  public void cleanup() {
-    this.connector.close();
-  }
-
-  /** {@inheritDoc} */
-  
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-	  declarer.declareStream("error", new Fields("HBase"));
-  }
-
-  /** {@inheritDoc} */
-  
-  public Map<String, Object> getComponentConfiguration() {
-    return null;
-  }
-
-  /**
-   * @return the autoAck
-   */
-  public boolean isAutoAck() {
-    return autoAck;
-  }
-
-  /**
-   * @param autoAck the autoAck to set
-   */
-  public void setAutoAck(boolean autoAck) {
-    this.autoAck = autoAck;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseStreamPartitioner.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseStreamPartitioner.java
deleted file mode 100644
index 519f76c..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseStreamPartitioner.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.hbase;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.HTable;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.task.WorkerTopologyContext;
-
-public class HBaseStreamPartitioner implements CustomStreamGrouping {
-
-  private static final long serialVersionUID = -148324019395976092L;
-  private String[] regionStartKeys = { "0" };
-  private Map<String, String> regionStartKeyRegionNameMap = new HashMap<String, String>();
-
-  private List<Integer> targetTasks = null;
-  private int targetTasksSize = 0;
-  private int rowKeyFieldIndex = 0;
-  private String tableName = null;
-  private long regionCheckTime = 0;
-  private int regionInforRefreshIntervalInMins = 60;
-  private int regionInforRefreshIntervalInMillis = regionInforRefreshIntervalInMins * 60000;
-
-  HTable hTable = null;;
-
-  
-  public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
-    
-    System.out.println("preparing HBaseStreamPartitioner for streamId " + stream.get_streamId());
-    this.targetTasks = targetTasks;
-    this.targetTasksSize = this.targetTasks.size();
-
-    Configuration conf = HBaseConfiguration.create();
-    try {
-      hTable = new HTable(conf, tableName);
-      refreshRegionInfo(tableName);
-
-      System.out.println("regionStartKeyRegionNameMap: " + regionStartKeyRegionNameMap);
-
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-
-  }
-  
-  public void prepare() {
-    
-    System.out.println("preparing HBaseStreamPartitioner for streamId " );
-
-    Configuration conf = HBaseConfiguration.create();
-    try {
-      hTable = new HTable(conf, tableName);
-      refreshRegionInfo(tableName);
-
-      System.out.println("regionStartKeyRegionNameMap: " + regionStartKeyRegionNameMap);
-
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-
-  }
-
-  public HBaseStreamPartitioner(String tableName, int rowKeyFieldIndex, int regionInforRefreshIntervalInMins) {
-    System.out.println("Created HBaseStreamPartitioner ");
-    this.rowKeyFieldIndex = rowKeyFieldIndex;
-    this.tableName = tableName;
-    this.regionInforRefreshIntervalInMins = regionInforRefreshIntervalInMins;
-    this.regionInforRefreshIntervalInMillis = regionInforRefreshIntervalInMins * 60000;
-
-  }
-
-  
-  public List<Integer> chooseTasks(int taskId, List<Object> values) {
-    List<Integer> choosenTasks = null;
-    System.out.println("Choosing task for taskId " + taskId + " and values " + values);
-
-    if (regionInforRefreshIntervalInMillis > (System.currentTimeMillis() - regionCheckTime)) {
-      try {
-        refreshRegionInfo(tableName);
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-    }
-
-    int regionIndex = getRegionIndex((String) values.get(rowKeyFieldIndex));
-
-    if (regionIndex < targetTasksSize) {
-      choosenTasks = Arrays.asList(regionIndex);
-
-    } else {
-      choosenTasks = Arrays.asList(regionIndex % targetTasksSize);
-    }
-    System.out.println("Choosen tasks are " + choosenTasks);
-
-    return choosenTasks;
-
-
-  }
-
-  
-  
-  public int getRegionIndex(String key) {
-    int index = Arrays.binarySearch(regionStartKeys, key);
-    if (index < -1) {
-      index = (index + 2) * -1;
-    } else if (index == -1) {
-      index = 0;
-    }
-
-    return index;
-  }
-
-  private void refreshRegionInfo(String tableName) throws IOException {
-
-    System.out.println("in refreshRegionInfo ");
-
-    Map<HRegionInfo, ServerName> regionMap = hTable.getRegionLocations();
-
-    synchronized (regionStartKeys) {
-      synchronized (regionStartKeyRegionNameMap) {
-        regionStartKeys = new String[regionMap.size()];
-        int index = 0;
-        String startKey = null;
-        regionStartKeyRegionNameMap.clear();
-        for (HRegionInfo regionInfo : regionMap.keySet()) {
-          startKey = new String(regionInfo.getStartKey());
-          regionStartKeyRegionNameMap.put(startKey, regionInfo.getRegionNameAsString());
-          regionStartKeys[index] = startKey;
-          index++;
-        }
-
-        Arrays.sort(regionStartKeys);
-        regionCheckTime = System.currentTimeMillis();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java
deleted file mode 100644
index d1a9327..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.hbase;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.io.Serializable;
-import java.lang.reflect.InvocationTargetException;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.log4j.Logger;
-
-import backtype.storm.generated.Bolt;
-
-import javax.annotation.Nullable;
-
-/**
- * HTable connector for Storm {@link Bolt}
- * <p>
- * The HBase configuration is picked up from the first <tt>hbase-site.xml</tt> encountered in the
- * classpath
- */
-@SuppressWarnings("serial")
-public class HTableConnector extends Connector implements Serializable{
-  private static final Logger LOG = Logger.getLogger(HTableConnector.class);
-  private Configuration conf;
-  protected HTableInterface table;
-  private String tableName;
-  private String connectorImpl;
-
-
-  /**
-   * Initialize HTable connection
-   * @param conf The {@link TupleTableConfig}
-   * @throws IOException
-   */
-  public HTableConnector(final TableConfig conf, String _quorum, String _port) throws IOException {
-    super(conf, _quorum, _port);
-    this.connectorImpl = conf.getConnectorImpl();
-    this.tableName = conf.getTableName();
-    this.conf = HBaseConfiguration.create();
-    
-    if(_quorum != null && _port != null)
-    {
-    	this.conf.set("hbase.zookeeper.quorum", _quorum);
-    	this.conf.set("hbase.zookeeper.property.clientPort", _port);
-    }
-
-    LOG.info(String.format("Initializing connection to HBase table %s at %s", tableName,
-      this.conf.get("hbase.rootdir")));
-
-    try {
-      this.table = getTableProvider().getTable(this.conf, this.tableName);
-    } catch (IOException ex) {
-      throw new IOException("Unable to establish connection to HBase table " + this.tableName, ex);
-    }
-
-    if (conf.isBatch()) {
-      // Enable client-side write buffer
-      this.table.setAutoFlush(false, true);
-      LOG.info("Enabled client-side write buffer");
-    }
-
-    // If set, override write buffer size
-    if (conf.getWriteBufferSize() > 0) {
-      try {
-        this.table.setWriteBufferSize(conf.getWriteBufferSize());
-
-        LOG.info("Setting client-side write buffer to " + conf.getWriteBufferSize());
-      } catch (IOException ex) {
-        LOG.error("Unable to set client-side write buffer size for HBase table " + this.tableName,
-          ex);
-      }
-    }
-
-    // Check the configured column families exist
-    for (String cf : conf.getColumnFamilies()) {
-      if (!columnFamilyExists(cf)) {
-        throw new RuntimeException(String.format(
-          "HBase table '%s' does not have column family '%s'", conf.getTableName(), cf));
-      }
-    }
-  }
-
-  protected TableProvider getTableProvider() throws IOException {
-    if(connectorImpl == null || connectorImpl.length() == 0 || connectorImpl.charAt(0) == '$') {
-      return new HTableProvider();
-    }
-    else {
-      try {
-        Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(connectorImpl);
-        return clazz.getConstructor().newInstance();
-      } catch (InstantiationException e) {
-        throw new IOException("Unable to instantiate connector.", e);
-      } catch (IllegalAccessException e) {
-        throw new IOException("Unable to instantiate connector: illegal access", e);
-      } catch (InvocationTargetException e) {
-        throw new IOException("Unable to instantiate connector", e);
-      } catch (NoSuchMethodException e) {
-        throw new IOException("Unable to instantiate connector: no such method", e);
-      } catch (ClassNotFoundException e) {
-        throw new IOException("Unable to instantiate connector: class not found", e);
-      }
-    }
-  }
-
-  /**
-   * Checks to see if table contains the given column family
-   * @param columnFamily The column family name
-   * @return boolean
-   * @throws IOException
-   */
-  private boolean columnFamilyExists(final String columnFamily) throws IOException {
-    return this.table.getTableDescriptor().hasFamily(Bytes.toBytes(columnFamily));
-  }
-
-  /**
-   * @return the table
-   */
-  public HTableInterface getTable() {
-    return table;
-  }
-
-  @Override
-  public void put(Put put) throws IOException {
-      table.put(put);
-  }
-
-  /**
-   * Close the table
-   */
-  @Override
-  public void close() {
-    try {
-      this.table.close();
-    } catch (IOException ex) {
-      LOG.error("Unable to close connection to HBase table " + tableName, ex);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableProvider.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableProvider.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableProvider.java
deleted file mode 100644
index e454f04..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableProvider.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.hbase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
-
-import java.io.IOException;
-
-public class HTableProvider implements TableProvider {
-    @Override
-    public HTableInterface getTable(Configuration config, String tableName) throws IOException {
-        return new HTable(config, tableName);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableConfig.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableConfig.java
deleted file mode 100644
index de2e929..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableConfig.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.hbase;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-
-public class TableConfig implements Serializable {
-    static final long serialVersionUID = -1L;
-    private String tableName;
-    private boolean batch = true;
-    protected Map<String, Set<String>> columnFamilies = new HashMap<>();
-    private long writeBufferSize = 0L;
-    private String connectorImpl;
-
-    public TableConfig() {
-
-    }
-
-    public TableConfig(String tableName) {
-        this.tableName = tableName;
-    }
-
-    public String getTableName() {
-        return tableName;
-    }
-
-    public TableConfig withConnectorImpl(String impl) {
-        connectorImpl = impl;
-        return this;
-    }
-
-    public TableConfig withTable(String table) {
-        this.tableName = table;
-        return this;
-    }
-
-    public TableConfig withBatch(Boolean isBatch) {
-        this.batch = isBatch;
-        return this;
-    }
-
-    public String getConnectorImpl() {
-        return connectorImpl;
-    }
-
-    /**
-     * @return Whether batch mode is enabled
-     */
-    public boolean isBatch() {
-        return batch;
-    }
-
-    /**
-     * @param batch
-     *          Whether to enable HBase's client-side write buffer.
-     *          <p>
-     *          When enabled your bolt will store put operations locally until the
-     *          write buffer is full, so they can be sent to HBase in a single RPC
-     *          call. When disabled each put operation is effectively an RPC and
-     *          is sent straight to HBase. As your bolt can process thousands of
-     *          values per second it is recommended that the write buffer is
-     *          enabled.
-     *          <p>
-     *          Enabled by default
-     */
-    public void setBatch(boolean batch) {
-        this.batch = batch;
-    }
-    /**
-     * @param writeBufferSize
-     *          Overrides the client-side write buffer size.
-     *          <p>
-     *          By default the write buffer size is 2 MB (2097152 bytes). If you
-     *          are storing larger data, you may want to consider increasing this
-     *          value to allow your bolt to efficiently group together a larger
-     *          number of records per RPC
-     *          <p>
-     *          Overrides the write buffer size you have set in your
-     *          hbase-site.xml e.g. <code>hbase.client.write.buffer</code>
-     */
-    public void setWriteBufferSize(long writeBufferSize) {
-        this.writeBufferSize = writeBufferSize;
-    }
-
-    /**
-     * @return the writeBufferSize
-     */
-    public long getWriteBufferSize() {
-        return writeBufferSize;
-    }
-    /**
-     * @return A Set of configured column families
-     */
-    public Set<String> getColumnFamilies() {
-        return this.columnFamilies.keySet();
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableProvider.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableProvider.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableProvider.java
deleted file mode 100644
index dc0569e..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableProvider.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.hbase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HTableInterface;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-public interface TableProvider extends Serializable {
-    HTableInterface getTable(Configuration config, String tableName) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TupleTableConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TupleTableConfig.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TupleTableConfig.java
deleted file mode 100644
index a9ec20a..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TupleTableConfig.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.hbase;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Set;
-import java.util.TreeMap;
-
-import com.google.common.base.Joiner;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import backtype.storm.tuple.Tuple;
-import org.apache.log4j.Logger;
-
-/**
- * Configuration for Storm {@link Tuple} to HBase serialization.
- */
-@SuppressWarnings("serial")
-public class TupleTableConfig extends TableConfig implements Serializable {
-  private static final Logger LOG = Logger.getLogger(TupleTableConfig.class);
-  static final long serialVersionUID = -1L;
-  public static final long DEFAULT_INCREMENT = 1L;
-  
-  protected String tupleRowKeyField;
-  protected String tupleTimestampField;
-  protected Durability durability = Durability.USE_DEFAULT;
-  private String fields;
-
-  /**
-   * Initialize configuration
-   * 
-   * @param table
-   *          The HBase table name
-   * @param rowKeyField
-   *          The {@link Tuple} field used to set the rowKey
-   */
-  public TupleTableConfig(final String table, final String rowKeyField) {
-    super(table);
-    this.tupleRowKeyField = rowKeyField;
-    this.tupleTimestampField = "";
-    this.columnFamilies = new HashMap<String, Set<String>>();
-  }
-  
-  /**
-   * Initialize configuration
-   * 
-   * @param table
-   *          The HBase table name
-   * @param rowKeyField
-   *          The {@link Tuple} field used to set the rowKey
-   * @param timestampField
-   *          The {@link Tuple} field used to set the timestamp
-   */
-  public TupleTableConfig(final String table, final String rowKeyField, final String timestampField) {
-    super(table);
-    this.tupleRowKeyField = rowKeyField;
-    this.tupleTimestampField = timestampField;
-    this.columnFamilies = new HashMap<String, Set<String>>();
-  }
-
-  public TupleTableConfig() {
-    super(null);
-    this.columnFamilies = new HashMap<String, Set<String>>();
-  }
-
-
-
-  public TupleTableConfig withRowKeyField(String rowKeyField) {
-    this.tupleRowKeyField = rowKeyField;
-    return this;
-  }
-
-  public TupleTableConfig withTimestampField(String timestampField) {
-    this.tupleTimestampField = timestampField;
-    return this;
-  }
-
-  public TupleTableConfig withFields(String fields) {
-    this.fields = fields;
-    return this;
-  }
-
-
-
-  public String getFields() {
-    return fields;
-  }
-
-
-
-  /**
-   * Add column family and column qualifier to be extracted from tuple
-   * 
-   * @param columnFamily
-   *          The column family name
-   * @param columnQualifier
-   *          The column qualifier name
-   */
-  public void addColumn(final String columnFamily, final String columnQualifier) {
-    Set<String> columns = this.columnFamilies.get(columnFamily);
-    
-    if (columns == null) {
-      columns = new HashSet<String>();
-    }
-    columns.add(columnQualifier);
-    
-    this.columnFamilies.put(columnFamily, columns);
-  }
-  
-  /**
-   * Creates a HBase {@link Put} from a Storm {@link Tuple}
-   * 
-   * @param tuple
-   *          The {@link Tuple}
-   * @return {@link Put}
-   */
-  public Put getPutFromTuple(final Tuple tuple) throws IOException{
-    byte[] rowKey = null;
-    try {
-      rowKey = Bytes.toBytes(tuple.getStringByField(tupleRowKeyField));
-    }
-    catch(IllegalArgumentException iae) {
-      throw new IOException("Unable to retrieve " + tupleRowKeyField + " from " + tuple + " [ " + Joiner.on(',').join(tuple.getFields()) + " ]", iae);
-    }
-    
-    long ts = 0;
-    if (!tupleTimestampField.equals("")) {
-      ts = tuple.getLongByField(tupleTimestampField);
-    }
-    
-    Put p = new Put(rowKey);
-    
-    p.setDurability(durability);
-    
-    if (columnFamilies.size() > 0) {
-      for (String cf : columnFamilies.keySet()) {
-        byte[] cfBytes = Bytes.toBytes(cf);
-        for (String cq : columnFamilies.get(cf)) {
-          byte[] cqBytes = Bytes.toBytes(cq);
-          byte[] val = tuple.getBinaryByField(cq);
-          
-          if (ts > 0) {
-            p.add(cfBytes, cqBytes, ts, val);
-          } else {
-            p.add(cfBytes, cqBytes, val);
-          }
-        }
-      }
-    }
-    
-    return p;
-  }
-  
-  /**
-   * Creates a HBase {@link Increment} from a Storm {@link Tuple}
-   * 
-   * @param tuple
-   *          The {@link Tuple}
-   * @param increment
-   *          The amount to increment the counter by
-   * @return {@link Increment}
-   */
-  public Increment getIncrementFromTuple(final Tuple tuple, final long increment) {
-    byte[] rowKey = Bytes.toBytes(tuple.getStringByField(tupleRowKeyField));
-    
-    Increment inc = new Increment(rowKey);
-    inc.setDurability(durability);
-    
-    if (columnFamilies.size() > 0) {
-      for (String cf : columnFamilies.keySet()) {
-        byte[] cfBytes = Bytes.toBytes(cf);
-        for (String cq : columnFamilies.get(cf)) {
-          byte[] val;
-          try {
-            val = Bytes.toBytes(tuple.getStringByField(cq));
-          } catch (IllegalArgumentException ex) {
-            // if cq isn't a tuple field, use cq for counter instead of tuple
-            // value
-            val = Bytes.toBytes(cq);
-          }
-          inc.addColumn(cfBytes, val, increment);
-        }
-      }
-    }
-    
-    return inc;
-  }
-  
-  /**
-   * Increment the counter for the given family and column by the specified
-   * amount
-   * <p>
-   * If the family and column already exist in the Increment the counter value
-   * is incremented by the specified amount rather than overridden, as it is in
-   * HBase's {@link Increment#addColumn(byte[], byte[], long)} method
-   * 
-   * @param inc
-   *          The {@link Increment} to update
-   * @param family
-   *          The column family
-   * @param qualifier
-   *          The column qualifier
-   * @param amount
-   *          The amount to increment the counter by
-   */
-  public static void addIncrement(Increment inc, final byte[] family, final byte[] qualifier, final Long amount) {
-    
-    NavigableMap<byte[], Long> set = inc.getFamilyMapOfLongs().get(family);
-    if (set == null) {
-      set = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
-    }
-    
-    // If qualifier exists, increment amount
-    Long counter = set.get(qualifier);
-    if (counter == null) {
-      counter = 0L;
-    }
-    set.put(qualifier, amount + counter);
-    
-    inc.getFamilyMapOfLongs().put(family, set);
-  }
-  
-
-
-  /**
-   * @param durability
-   *          Sets whether to write to HBase's edit log.
-   *          <p>
-   *          Setting to false will mean fewer operations to perform when
-   *          writing to HBase and hence better performance, but changes that
-   *          haven't been flushed to a store file will be lost in the event of
-   *          HBase failure
-   *          <p>
-   *          Enabled by default
-   */
-  public void setDurability(Durability durability) {
-    this.durability = durability;
-  }
-  
-  
-  public Durability getDurability() {
-    return  durability;
-  }
-  
-
-
-  /**
-   * @return the tupleRowKeyField
-   */
-  public String getTupleRowKeyField() {
-    return tupleRowKeyField;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/AbstractConverter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/AbstractConverter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/AbstractConverter.java
deleted file mode 100644
index c58dc22..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/AbstractConverter.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.hbase.converters;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.metron.reference.lookup.LookupKV;
-import org.apache.metron.reference.lookup.LookupKey;
-import org.apache.metron.reference.lookup.LookupValue;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.*;
-
-
-public abstract class AbstractConverter<KEY_T extends LookupKey, VALUE_T extends LookupValue> implements HbaseConverter<KEY_T,VALUE_T> {
-  public static Function<Cell, Map.Entry<byte[], byte[]>> CELL_TO_ENTRY  = new Function<Cell, Map.Entry<byte[], byte[]>>() {
-
-    @Nullable
-    @Override
-    public Map.Entry<byte[], byte[]> apply(@Nullable Cell cell) {
-      return new AbstractMap.SimpleEntry<>(cell.getQualifier(), cell.getValue());
-    }
-  };
-  @Override
-  public Put toPut(String columnFamily, KEY_T key, VALUE_T values) throws IOException {
-    Put put = new Put(key.toBytes());
-    byte[] cf = Bytes.toBytes(columnFamily);
-    for(Map.Entry<byte[], byte[]> kv : values.toColumns()) {
-      put.add(cf, kv.getKey(), kv.getValue());
-    }
-    return put;
-  }
-
-  public LookupKV<KEY_T, VALUE_T> fromPut(Put put, String columnFamily, KEY_T key, VALUE_T value) throws IOException {
-    key.fromBytes(put.getRow());
-    byte[] cf = Bytes.toBytes(columnFamily);
-    value.fromColumns(Iterables.transform(put.getFamilyCellMap().get(cf), CELL_TO_ENTRY));
-    return new LookupKV<>(key, value);
-  }
-
-  @Override
-  public Result toResult(String columnFamily, KEY_T key, VALUE_T values) throws IOException {
-    Put put = toPut(columnFamily, key, values);
-    return Result.create(put.getFamilyCellMap().get(Bytes.toBytes(columnFamily)));
-  }
-
-  public LookupKV<KEY_T, VALUE_T> fromResult(Result result, String columnFamily, KEY_T key, VALUE_T value) throws IOException {
-    if(result == null || result.getRow() == null) {
-      return null;
-    }
-    key.fromBytes(result.getRow());
-    byte[] cf = Bytes.toBytes(columnFamily);
-    NavigableMap<byte[], byte[]> cols = result.getFamilyMap(cf);
-    value.fromColumns(cols.entrySet());
-    return new LookupKV<>(key, value);
-  }
-  @Override
-  public Get toGet(String columnFamily, KEY_T key) {
-    Get ret = new Get(key.toBytes());
-    ret.addFamily(Bytes.toBytes(columnFamily));
-    return ret;
-  }
-
-  public static Iterable<Map.Entry<byte[], byte[]>> toEntries(byte[]... kvs) {
-    if(kvs.length % 2 != 0)  {
-      throw new IllegalStateException("Must be an even size");
-    }
-    List<Map.Entry<byte[], byte[]>> ret = new ArrayList<>(kvs.length/2);
-    for(int i = 0;i < kvs.length;i += 2) {
-      ret.add(new AbstractMap.SimpleImmutableEntry<>(kvs[i], kvs[i+1])) ;
-    }
-    return ret;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/HbaseConverter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/HbaseConverter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/HbaseConverter.java
deleted file mode 100644
index 449d9cf..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/HbaseConverter.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.hbase.converters;
-
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.metron.reference.lookup.LookupKV;
-import org.apache.metron.reference.lookup.LookupKey;
-import org.apache.metron.reference.lookup.LookupValue;
-
-import java.io.IOException;
-
-public interface HbaseConverter<KEY_T extends LookupKey, VALUE_T extends LookupValue> {
-    Put toPut(String columnFamily, KEY_T key, VALUE_T values) throws IOException;
-
-    LookupKV<KEY_T, VALUE_T> fromPut(Put put, String columnFamily) throws IOException;
-
-    Result toResult(String columnFamily, KEY_T key, VALUE_T values) throws IOException;
-
-    LookupKV<KEY_T, VALUE_T> fromResult(Result result, String columnFamily) throws IOException;
-
-    Get toGet(String columnFamily, KEY_T key);
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/enrichment/EnrichmentConverter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/enrichment/EnrichmentConverter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/enrichment/EnrichmentConverter.java
deleted file mode 100644
index a044498..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/enrichment/EnrichmentConverter.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.hbase.converters.enrichment;
-
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.metron.hbase.converters.AbstractConverter;
-import org.apache.metron.reference.lookup.LookupKV;
-
-import java.io.IOException;
-
-public class EnrichmentConverter extends AbstractConverter<EnrichmentKey, EnrichmentValue> {
-
-  @Override
-  public LookupKV<EnrichmentKey, EnrichmentValue> fromPut(Put put, String columnFamily) throws IOException {
-    return fromPut(put, columnFamily, new EnrichmentKey(), new EnrichmentValue());
-  }
-
-  @Override
-  public LookupKV<EnrichmentKey, EnrichmentValue> fromResult(Result result, String columnFamily) throws IOException {
-    return fromResult(result, columnFamily, new EnrichmentKey(), new EnrichmentValue());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/enrichment/EnrichmentHelper.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/enrichment/EnrichmentHelper.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/enrichment/EnrichmentHelper.java
deleted file mode 100644
index a3d1b66..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/enrichment/EnrichmentHelper.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.hbase.converters.enrichment;
-
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.metron.reference.lookup.LookupKV;
-
-import java.io.IOException;
-
-public enum EnrichmentHelper {
-    INSTANCE;
-    EnrichmentConverter converter = new EnrichmentConverter();
-
-    public void load(HTableInterface table, String cf, Iterable<LookupKV<EnrichmentKey, EnrichmentValue>> results) throws IOException {
-        for(LookupKV<EnrichmentKey, EnrichmentValue> result : results) {
-            Put put = converter.toPut(cf, result.getKey(), result.getValue());
-            table.put(put);
-        }
-    }
-}