You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2016/04/26 16:46:11 UTC
[23/51] [partial] incubator-metron git commit: METRON-113 Project
Reorganization (merrimanr) closes apache/incubator-metron#88
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/pom.xml b/metron-platform/metron-hbase/pom.xml
new file mode 100644
index 0000000..fc12808
--- /dev/null
+++ b/metron-platform/metron-hbase/pom.xml
@@ -0,0 +1,77 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software
+ Foundation (ASF) under one or more contributor license agreements. See the
+ NOTICE file distributed with this work for additional information regarding
+ copyright ownership. The ASF licenses this file to You under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-platform</artifactId>
+ <version>0.1BETA</version>
+ </parent>
+ <artifactId>metron-hbase</artifactId>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ <mysql.version>5.1.31</mysql.version>
+ <slf4j.version>1.7.7</slf4j.version>
+ <storm.hdfs.version>0.1.2</storm.hdfs.version>
+ <guava.version>${global_hbase_guava_version}</guava.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-common</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${global_hbase_version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${global_storm_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.17</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/Connector.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/Connector.java b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/Connector.java
new file mode 100644
index 0000000..e787e43
--- /dev/null
+++ b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/Connector.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.hbase;
+
+import org.apache.hadoop.hbase.client.Put;
+
+import java.io.IOException;
+
+public abstract class Connector {
+ protected TableConfig tableConf;
+ protected String _quorum;
+ protected String _port;
+
+ public Connector(final TableConfig conf, String _quorum, String _port) throws IOException {
+ this.tableConf = conf;
+ this._quorum = _quorum;
+ this._port = _port;
+ }
+ public abstract void put(Put put) throws IOException;
+ public abstract void close();
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/HTableConnector.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/HTableConnector.java b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/HTableConnector.java
new file mode 100644
index 0000000..ace4d80
--- /dev/null
+++ b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/HTableConnector.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.hbase;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.Logger;
+
+import backtype.storm.generated.Bolt;
+
+/**
+ * HTable connector for Storm {@link Bolt}
+ * <p>
+ * The HBase configuration is picked up from the first <tt>hbase-site.xml</tt> encountered in the
+ * classpath
+ */
+@SuppressWarnings("serial")
+public class HTableConnector extends Connector implements Serializable{
+ private static final Logger LOG = Logger.getLogger(HTableConnector.class);
+ private Configuration conf;
+ protected HTableInterface table;
+ private String tableName;
+ private String connectorImpl;
+
+
+ /**
+ * Initialize HTable connection
+ * @param conf The {@link TupleTableConfig}
+ * @throws IOException
+ */
+ public HTableConnector(final TableConfig conf, String _quorum, String _port) throws IOException {
+ super(conf, _quorum, _port);
+ this.connectorImpl = conf.getConnectorImpl();
+ this.tableName = conf.getTableName();
+ this.conf = HBaseConfiguration.create();
+
+ if(_quorum != null && _port != null)
+ {
+ this.conf.set("hbase.zookeeper.quorum", _quorum);
+ this.conf.set("hbase.zookeeper.property.clientPort", _port);
+ }
+
+ LOG.info(String.format("Initializing connection to HBase table %s at %s", tableName,
+ this.conf.get("hbase.rootdir")));
+
+ try {
+ this.table = getTableProvider().getTable(this.conf, this.tableName);
+ } catch (IOException ex) {
+ throw new IOException("Unable to establish connection to HBase table " + this.tableName, ex);
+ }
+
+ if (conf.isBatch()) {
+ // Enable client-side write buffer
+ this.table.setAutoFlush(false, true);
+ LOG.info("Enabled client-side write buffer");
+ }
+
+ // If set, override write buffer size
+ if (conf.getWriteBufferSize() > 0) {
+ try {
+ this.table.setWriteBufferSize(conf.getWriteBufferSize());
+
+ LOG.info("Setting client-side write buffer to " + conf.getWriteBufferSize());
+ } catch (IOException ex) {
+ LOG.error("Unable to set client-side write buffer size for HBase table " + this.tableName,
+ ex);
+ }
+ }
+
+ // Check the configured column families exist
+ for (String cf : conf.getColumnFamilies()) {
+ if (!columnFamilyExists(cf)) {
+ throw new RuntimeException(String.format(
+ "HBase table '%s' does not have column family '%s'", conf.getTableName(), cf));
+ }
+ }
+ }
+
+ protected TableProvider getTableProvider() throws IOException {
+ if(connectorImpl == null || connectorImpl.length() == 0 || connectorImpl.charAt(0) == '$') {
+ return new HTableProvider();
+ }
+ else {
+ try {
+ Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(connectorImpl);
+ return clazz.getConstructor().newInstance();
+ } catch (InstantiationException e) {
+ throw new IOException("Unable to instantiate connector.", e);
+ } catch (IllegalAccessException e) {
+ throw new IOException("Unable to instantiate connector: illegal access", e);
+ } catch (InvocationTargetException e) {
+ throw new IOException("Unable to instantiate connector", e);
+ } catch (NoSuchMethodException e) {
+ throw new IOException("Unable to instantiate connector: no such method", e);
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Unable to instantiate connector: class not found", e);
+ }
+ }
+ }
+
+ /**
+ * Checks to see if table contains the given column family
+ * @param columnFamily The column family name
+ * @return boolean
+ * @throws IOException
+ */
+ private boolean columnFamilyExists(final String columnFamily) throws IOException {
+ return this.table.getTableDescriptor().hasFamily(Bytes.toBytes(columnFamily));
+ }
+
+ /**
+ * @return the table
+ */
+ public HTableInterface getTable() {
+ return table;
+ }
+
+ @Override
+ public void put(Put put) throws IOException {
+ table.put(put);
+ }
+
+ /**
+ * Close the table
+ */
+ @Override
+ public void close() {
+ try {
+ this.table.close();
+ } catch (IOException ex) {
+ LOG.error("Unable to close connection to HBase table " + tableName, ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/HTableProvider.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/HTableProvider.java b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/HTableProvider.java
new file mode 100644
index 0000000..e454f04
--- /dev/null
+++ b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/HTableProvider.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+
+import java.io.IOException;
+
+public class HTableProvider implements TableProvider {
+ @Override
+ public HTableInterface getTable(Configuration config, String tableName) throws IOException {
+ return new HTable(config, tableName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TableConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TableConfig.java b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TableConfig.java
new file mode 100644
index 0000000..de2e929
--- /dev/null
+++ b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TableConfig.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.hbase;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+
+public class TableConfig implements Serializable {
+ static final long serialVersionUID = -1L;
+ private String tableName;
+ private boolean batch = true;
+ protected Map<String, Set<String>> columnFamilies = new HashMap<>();
+ private long writeBufferSize = 0L;
+ private String connectorImpl;
+
+ public TableConfig() {
+
+ }
+
+ public TableConfig(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public TableConfig withConnectorImpl(String impl) {
+ connectorImpl = impl;
+ return this;
+ }
+
+ public TableConfig withTable(String table) {
+ this.tableName = table;
+ return this;
+ }
+
+ public TableConfig withBatch(Boolean isBatch) {
+ this.batch = isBatch;
+ return this;
+ }
+
+ public String getConnectorImpl() {
+ return connectorImpl;
+ }
+
+ /**
+ * @return Whether batch mode is enabled
+ */
+ public boolean isBatch() {
+ return batch;
+ }
+
+ /**
+ * @param batch
+ * Whether to enable HBase's client-side write buffer.
+ * <p>
+ * When enabled your bolt will store put operations locally until the
+ * write buffer is full, so they can be sent to HBase in a single RPC
+ * call. When disabled each put operation is effectively an RPC and
+ * is sent straight to HBase. As your bolt can process thousands of
+ * values per second it is recommended that the write buffer is
+ * enabled.
+ * <p>
+ * Enabled by default
+ */
+ public void setBatch(boolean batch) {
+ this.batch = batch;
+ }
+ /**
+ * @param writeBufferSize
+ * Overrides the client-side write buffer size.
+ * <p>
+ * By default the write buffer size is 2 MB (2097152 bytes). If you
+ * are storing larger data, you may want to consider increasing this
+ * value to allow your bolt to efficiently group together a larger
+ * number of records per RPC
+ * <p>
+ * Overrides the write buffer size you have set in your
+ * hbase-site.xml e.g. <code>hbase.client.write.buffer</code>
+ */
+ public void setWriteBufferSize(long writeBufferSize) {
+ this.writeBufferSize = writeBufferSize;
+ }
+
+ /**
+ * @return the writeBufferSize
+ */
+ public long getWriteBufferSize() {
+ return writeBufferSize;
+ }
+ /**
+ * @return A Set of configured column families
+ */
+ public Set<String> getColumnFamilies() {
+ return this.columnFamilies.keySet();
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TableProvider.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TableProvider.java b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TableProvider.java
new file mode 100644
index 0000000..dc0569e
--- /dev/null
+++ b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TableProvider.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+public interface TableProvider extends Serializable {
+ HTableInterface getTable(Configuration config, String tableName) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TupleTableConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TupleTableConfig.java b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TupleTableConfig.java
new file mode 100644
index 0000000..8257d8a
--- /dev/null
+++ b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TupleTableConfig.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.hbase;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+
+import com.google.common.base.Joiner;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.log4j.Logger;
+
+/**
+ * Configuration for Storm {@link Tuple} to HBase serialization.
+ */
+@SuppressWarnings("serial")
+public class TupleTableConfig extends TableConfig implements Serializable {
+ private static final Logger LOG = Logger.getLogger(TupleTableConfig.class);
+ static final long serialVersionUID = -1L;
+ public static final long DEFAULT_INCREMENT = 1L;
+
+ protected String tupleRowKeyField;
+ protected String tupleTimestampField;
+ protected Durability durability = Durability.USE_DEFAULT;
+ private String fields;
+
+ /**
+ * Initialize configuration
+ *
+ * @param table
+ * The HBase table name
+ * @param rowKeyField
+ * The {@link Tuple} field used to set the rowKey
+ */
+ public TupleTableConfig(final String table, final String rowKeyField) {
+ super(table);
+ this.tupleRowKeyField = rowKeyField;
+ this.tupleTimestampField = "";
+ this.columnFamilies = new HashMap<String, Set<String>>();
+ }
+
+ /**
+ * Initialize configuration
+ *
+ * @param table
+ * The HBase table name
+ * @param rowKeyField
+ * The {@link Tuple} field used to set the rowKey
+ * @param timestampField
+ * The {@link Tuple} field used to set the timestamp
+ */
+ public TupleTableConfig(final String table, final String rowKeyField, final String timestampField) {
+ super(table);
+ this.tupleRowKeyField = rowKeyField;
+ this.tupleTimestampField = timestampField;
+ this.columnFamilies = new HashMap<String, Set<String>>();
+ }
+
+ public TupleTableConfig() {
+ super(null);
+ this.columnFamilies = new HashMap<String, Set<String>>();
+ }
+
+
+
+ public TupleTableConfig withRowKeyField(String rowKeyField) {
+ this.tupleRowKeyField = rowKeyField;
+ return this;
+ }
+
+ public TupleTableConfig withTimestampField(String timestampField) {
+ this.tupleTimestampField = timestampField;
+ return this;
+ }
+
+ public TupleTableConfig withFields(String fields) {
+ this.fields = fields;
+ return this;
+ }
+
+
+
+ public String getFields() {
+ return fields;
+ }
+
+
+
+ /**
+ * Add column family and column qualifier to be extracted from tuple
+ *
+ * @param columnFamily
+ * The column family name
+ * @param columnQualifier
+ * The column qualifier name
+ */
+ public void addColumn(final String columnFamily, final String columnQualifier) {
+ Set<String> columns = this.columnFamilies.get(columnFamily);
+
+ if (columns == null) {
+ columns = new HashSet<String>();
+ }
+ columns.add(columnQualifier);
+
+ this.columnFamilies.put(columnFamily, columns);
+ }
+
+ /**
+ * Creates a HBase {@link Put} from a Storm {@link Tuple}
+ *
+ * @param tuple
+ * The {@link Tuple}
+ * @return {@link Put}
+ */
+ public Put getPutFromTuple(final Tuple tuple) throws IOException{
+ byte[] rowKey = null;
+ try {
+ rowKey = Bytes.toBytes(tuple.getStringByField(tupleRowKeyField));
+ }
+ catch(IllegalArgumentException iae) {
+ throw new IOException("Unable to retrieve " + tupleRowKeyField + " from " + tuple + " [ " + Joiner.on(',').join(tuple.getFields()) + " ]", iae);
+ }
+
+ long ts = 0;
+ if (!tupleTimestampField.equals("")) {
+ ts = tuple.getLongByField(tupleTimestampField);
+ }
+
+ Put p = new Put(rowKey);
+
+ p.setDurability(durability);
+
+ if (columnFamilies.size() > 0) {
+ for (String cf : columnFamilies.keySet()) {
+ byte[] cfBytes = Bytes.toBytes(cf);
+ for (String cq : columnFamilies.get(cf)) {
+ byte[] cqBytes = Bytes.toBytes(cq);
+ byte[] val = tuple.getBinaryByField(cq);
+
+ if (ts > 0) {
+ p.add(cfBytes, cqBytes, ts, val);
+ } else {
+ p.add(cfBytes, cqBytes, val);
+ }
+ }
+ }
+ }
+
+ return p;
+ }
+
+ /**
+ * Creates a HBase {@link Increment} from a Storm {@link Tuple}
+ *
+ * @param tuple
+ * The {@link Tuple}
+ * @param increment
+ * The amount to increment the counter by
+ * @return {@link Increment}
+ */
+ public Increment getIncrementFromTuple(final Tuple tuple, final long increment) {
+ byte[] rowKey = Bytes.toBytes(tuple.getStringByField(tupleRowKeyField));
+
+ Increment inc = new Increment(rowKey);
+ inc.setDurability(durability);
+
+ if (columnFamilies.size() > 0) {
+ for (String cf : columnFamilies.keySet()) {
+ byte[] cfBytes = Bytes.toBytes(cf);
+ for (String cq : columnFamilies.get(cf)) {
+ byte[] val;
+ try {
+ val = Bytes.toBytes(tuple.getStringByField(cq));
+ } catch (IllegalArgumentException ex) {
+ // if cq isn't a tuple field, use cq for counter instead of tuple
+ // value
+ val = Bytes.toBytes(cq);
+ }
+ inc.addColumn(cfBytes, val, increment);
+ }
+ }
+ }
+
+ return inc;
+ }
+
+ /**
+ * Increment the counter for the given family and column by the specified
+ * amount
+ * <p>
+ * If the family and column already exist in the Increment the counter value
+ * is incremented by the specified amount rather than overridden, as it is in
+ * HBase's {@link Increment#addColumn(byte[], byte[], long)} method
+ *
+ * @param inc
+ * The {@link Increment} to update
+ * @param family
+ * The column family
+ * @param qualifier
+ * The column qualifier
+ * @param amount
+ * The amount to increment the counter by
+ */
+ public static void addIncrement(Increment inc, final byte[] family, final byte[] qualifier, final Long amount) {
+
+ NavigableMap<byte[], Long> set = inc.getFamilyMapOfLongs().get(family);
+ if (set == null) {
+ set = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+ }
+
+ // If qualifier exists, increment amount
+ Long counter = set.get(qualifier);
+ if (counter == null) {
+ counter = 0L;
+ }
+ set.put(qualifier, amount + counter);
+
+ inc.getFamilyMapOfLongs().put(family, set);
+ }
+
+
+
+ /**
+ * @param durability
+ * Sets whether to write to HBase's edit log.
+ * <p>
+ * Setting to false will mean fewer operations to perform when
+ * writing to HBase and hence better performance, but changes that
+ * haven't been flushed to a store file will be lost in the event of
+ * HBase failure
+ * <p>
+ * Enabled by default
+ */
+ public void setDurability(Durability durability) {
+ this.durability = durability;
+ }
+
+
+ public Durability getDurability() {
+ return durability;
+ }
+
+
+
+ /**
+ * @return the tupleRowKeyField
+ */
+ public String getTupleRowKeyField() {
+ return tupleRowKeyField;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/writer/HBaseWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/writer/HBaseWriter.java b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/writer/HBaseWriter.java
new file mode 100644
index 0000000..1fd69b3
--- /dev/null
+++ b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/writer/HBaseWriter.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.hbase.writer;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.common.utils.ReflectionUtils;
+import org.apache.metron.common.interfaces.MessageWriter;
+import org.json.simple.JSONObject;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+public abstract class HBaseWriter implements MessageWriter<JSONObject>, Serializable {
+
+ private String tableName;
+ private String connectorImpl;
+ private TableProvider provider;
+ private HTableInterface table;
+
+ public HBaseWriter(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public HBaseWriter withProviderImpl(String connectorImpl) {
+ this.connectorImpl = connectorImpl;
+ return this;
+ }
+
+ @Override
+ public void init() {
+ final Configuration config = HBaseConfiguration.create();
+ try {
+ provider = ReflectionUtils.createInstance(connectorImpl, new HTableProvider());
+ table = provider.getTable(config, tableName);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void write(String sourceType, Configurations configurations, Tuple tuple, JSONObject message) throws Exception {
+ Put put = new Put(getKey(tuple, message));
+ Map<String, byte[]> values = getValues(tuple, message);
+ for(String column: values.keySet()) {
+ String[] columnParts = column.split(":");
+ long timestamp = getTimestamp(tuple, message);
+ if (timestamp > -1) {
+ put.addColumn(Bytes.toBytes(columnParts[0]), Bytes.toBytes(columnParts[1]), timestamp, values.get(column));
+ } else {
+ put.addColumn(Bytes.toBytes(columnParts[0]), Bytes.toBytes(columnParts[1]), values.get(column));
+ }
+ }
+ table.put(put);
+ }
+
+ @Override
+ public void close() throws Exception {
+ table.close();
+ }
+
+ public abstract byte[] getKey(Tuple tuple, JSONObject message);
+ public abstract long getTimestamp(Tuple tuple, JSONObject message);
+ public abstract Map<String, byte[]> getValues(Tuple tuple, JSONObject message);
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-integration-test/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/pom.xml b/metron-platform/metron-integration-test/pom.xml
new file mode 100644
index 0000000..b3c3f09
--- /dev/null
+++ b/metron-platform/metron-integration-test/pom.xml
@@ -0,0 +1,121 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software
+ Foundation (ASF) under one or more contributor license agreements. See the
+ NOTICE file distributed with this work for additional information regarding
+ copyright ownership. The ASF licenses this file to You under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-platform</artifactId>
+ <version>0.1BETA</version>
+ </parent>
+ <artifactId>metron-integration-test</artifactId>
+ <description>Metron Integration Test</description>
+ <properties>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.17</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${global_junit_version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>flux-core</artifactId>
+ <version>${global_flux_version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${global_storm_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${global_hbase_guava_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${global_hbase_version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>log4j</artifactId>
+ <groupId>log4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${global_hadoop_version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.9.2</artifactId>
+ <version>${global_kafka_version}</version>
+ <classifier>test</classifier>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.9.2</artifactId>
+ <version>${global_kafka_version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-enrichment</artifactId>
+ <version>0.1BETA</version>
+ </dependency>
+ <!--dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-parsers</artifactId>
+ <version>0.1BETA</version>
+ </dependency-->
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-test-utilities</artifactId>
+ <version>0.1BETA</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ </build>
+ <reporting>
+ </reporting>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java
new file mode 100644
index 0000000..3f21c0d
--- /dev/null
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration;
+
+import com.google.common.base.Function;
+import org.apache.metron.TestConstants;
+import org.apache.metron.integration.components.KafkaWithZKComponent;
+import org.apache.metron.common.cli.ConfigurationsUtils;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Properties;
+
+public abstract class BaseIntegrationTest {
+
+ protected KafkaWithZKComponent getKafkaComponent(final Properties topologyProperties, List<KafkaWithZKComponent.Topic> topics) {
+ return new KafkaWithZKComponent().withTopics(topics)
+ .withPostStartCallback(new Function<KafkaWithZKComponent, Void>() {
+ @Nullable
+ @Override
+ public Void apply(@Nullable KafkaWithZKComponent kafkaWithZKComponent) {
+ topologyProperties.setProperty("kafka.zk", kafkaWithZKComponent.getZookeeperConnect());
+ try {
+ ConfigurationsUtils.uploadConfigsToZookeeper(TestConstants.SAMPLE_CONFIG_PATH, kafkaWithZKComponent.getZookeeperConnect());
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ return null;
+ }
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ComponentRunner.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ComponentRunner.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ComponentRunner.java
new file mode 100644
index 0000000..c938741
--- /dev/null
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ComponentRunner.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration;
+
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class ComponentRunner {
+ public static class Builder {
+ LinkedHashMap<String, InMemoryComponent> components;
+ String[] startupOrder;
+ String[] shutdownOrder;
+ long timeBetweenAttempts = 1000;
+ int numRetries = 5;
+ long maxTimeMS = 120000;
+ public Builder() {
+ components = new LinkedHashMap<String, InMemoryComponent>();
+ }
+
+ public Builder withNumRetries(int numRetries) {
+ this.numRetries = numRetries;
+ return this;
+ }
+
+ public Builder withMaxTimeMS(long maxTimeMS) {
+ this.maxTimeMS = maxTimeMS;
+ return this;
+ }
+
+ public Builder withComponent(String name, InMemoryComponent component) {
+ components.put(name, component);
+ return this;
+ }
+
+ public Builder withCustomStartupOrder(String[] startupOrder) {
+ this.startupOrder = startupOrder;
+ return this;
+ }
+ public Builder withCustomShutdownOrder(String[] shutdownOrder) {
+ this.shutdownOrder = shutdownOrder;
+ return this;
+ }
+ public Builder withMillisecondsBetweenAttempts(long timeBetweenAttempts) {
+ this.timeBetweenAttempts = timeBetweenAttempts;
+ return this;
+ }
+ private static String[] toOrderedList(Map<String, InMemoryComponent> components) {
+ String[] ret = new String[components.size()];
+ int i = 0;
+ for(String component : components.keySet()) {
+ ret[i++] = component;
+ }
+ return ret;
+ }
+ public ComponentRunner build() {
+ if(shutdownOrder == null) {
+ shutdownOrder = toOrderedList(components);
+ }
+ if(startupOrder == null) {
+ startupOrder = toOrderedList(components);
+ }
+ return new ComponentRunner(components, startupOrder, shutdownOrder, timeBetweenAttempts, numRetries, maxTimeMS);
+ }
+
+ }
+
+ LinkedHashMap<String, InMemoryComponent> components;
+ String[] startupOrder;
+ String[] shutdownOrder;
+ long timeBetweenAttempts;
+ int numRetries;
+ long maxTimeMS;
+ public ComponentRunner( LinkedHashMap<String, InMemoryComponent> components
+ , String[] startupOrder
+ , String[] shutdownOrder
+ , long timeBetweenAttempts
+ , int numRetries
+ , long maxTimeMS
+ )
+ {
+ this.components = components;
+ this.startupOrder = startupOrder;
+ this.shutdownOrder = shutdownOrder;
+ this.timeBetweenAttempts = timeBetweenAttempts;
+ this.numRetries = numRetries;
+ this.maxTimeMS = maxTimeMS;
+ }
+
+ public <T extends InMemoryComponent> T getComponent(String name, Class<T> clazz) {
+ return clazz.cast(getComponents().get(name));
+ }
+
+ public LinkedHashMap<String, InMemoryComponent> getComponents() {
+ return components;
+ }
+
+ public void start() throws UnableToStartException {
+ for(String componentName : startupOrder) {
+ components.get(componentName).start();
+ }
+ }
+ public void stop() {
+ for(String componentName : shutdownOrder) {
+ components.get(componentName).stop();
+ }
+ }
+
+
+ public <T> T process(Processor<T> successState) {
+ int retryCount = 0;
+ long start = System.currentTimeMillis();
+ while(true) {
+ long duration = System.currentTimeMillis() - start;
+ if(duration > maxTimeMS) {
+ throw new RuntimeException("Took too long to complete: " + duration + " > " + maxTimeMS);
+ }
+ ReadinessState state = successState.process(this);
+ if(state == ReadinessState.READY) {
+ return successState.getResult();
+ }
+ else if(state == ReadinessState.NOT_READY) {
+ retryCount++;
+ if(numRetries > 0 && retryCount > numRetries) {
+ throw new RuntimeException("Too many retries: " + retryCount);
+ }
+ }
+ try {
+ Thread.sleep(timeBetweenAttempts);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Unable to sleep", e);
+ }
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/EnrichmentIntegrationTest.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
new file mode 100644
index 0000000..21019c3
--- /dev/null
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
@@ -0,0 +1,468 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.*;
+
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.metron.common.Constants;
+import org.apache.metron.TestConstants;
+import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.converter.EnrichmentHelper;
+import org.apache.metron.integration.utils.TestUtils;
+import org.apache.metron.test.utils.UnitTestHelper;
+import org.apache.metron.integration.components.FluxTopologyComponent;
+import org.apache.metron.integration.components.KafkaWithZKComponent;
+import org.apache.metron.integration.mock.MockGeoAdapter;
+import org.apache.metron.test.mock.MockHTable;
+import org.apache.metron.enrichment.lookup.LookupKV;
+
+import org.apache.metron.integration.utils.SampleUtil;
+import org.apache.metron.common.utils.JSONUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.Stack;
+
+public abstract class EnrichmentIntegrationTest extends BaseIntegrationTest {
+ private static final String SRC_IP = "ip_src_addr";
+ private static final String DST_IP = "ip_dst_addr";
+ private static final String MALICIOUS_IP_TYPE = "malicious_ip";
+ private static final String PLAYFUL_CLASSIFICATION_TYPE = "playful_classification";
+ private static final Map<String, String> PLAYFUL_ENRICHMENT = new HashMap<String, String>() {{
+ put("orientation", "north");
+ }};
+ private String fluxPath = "../metron-enrichment/src/main/flux/enrichment/test.yaml";
+ protected String hdfsDir = "target/enrichmentIntegrationTest/hdfs";
+ private String sampleParsedPath = TestConstants.SAMPLE_DATA_PARSED_PATH + "YafExampleParsed";
+ private String sampleIndexedPath = TestConstants.SAMPLE_DATA_INDEXED_PATH + "YafIndexed";
+
+
+ public static class Provider implements TableProvider, Serializable {
+ MockHTable.Provider provider = new MockHTable.Provider();
+ @Override
+ public HTableInterface getTable(Configuration config, String tableName) throws IOException {
+ return provider.getTable(config, tableName);
+ }
+ }
+
+ public static void cleanHdfsDir(String hdfsDirStr) {
+ File hdfsDir = new File(hdfsDirStr);
+ Stack<File> fs = new Stack<>();
+ if(hdfsDir.exists()) {
+ fs.push(hdfsDir);
+ while(!fs.empty()) {
+ File f = fs.pop();
+ if (f.isDirectory()) {
+ for(File child : f.listFiles()) {
+ fs.push(child);
+ }
+ }
+ else {
+ if (f.getName().startsWith("enrichment") || f.getName().endsWith(".json")) {
+ f.delete();
+ }
+ }
+ }
+ }
+ }
+
+ public static List<Map<String, Object> > readDocsFromDisk(String hdfsDirStr) throws IOException {
+ List<Map<String, Object>> ret = new ArrayList<>();
+ File hdfsDir = new File(hdfsDirStr);
+ Stack<File> fs = new Stack<>();
+ if(hdfsDir.exists()) {
+ fs.push(hdfsDir);
+ while(!fs.empty()) {
+ File f = fs.pop();
+ if(f.isDirectory()) {
+ for (File child : f.listFiles()) {
+ fs.push(child);
+ }
+ }
+ else {
+ System.out.println("Processed " + f);
+ if (f.getName().startsWith("enrichment") || f.getName().endsWith(".json")) {
+ List<byte[]> data = TestUtils.readSampleData(f.getPath());
+ Iterables.addAll(ret, Iterables.transform(data, new Function<byte[], Map<String, Object>>() {
+ @Nullable
+ @Override
+ public Map<String, Object> apply(@Nullable byte[] bytes) {
+ String s = new String(bytes);
+ try {
+ return JSONUtils.INSTANCE.load(s, new TypeReference<Map<String, Object>>() {
+ });
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }));
+ }
+ }
+ }
+ }
+ return ret;
+ }
+
+
+ @Test
+ public void test() throws Exception {
+ cleanHdfsDir(hdfsDir);
+ final Configurations configurations = SampleUtil.getSampleConfigs();
+ final String dateFormat = "yyyy.MM.dd.HH";
+ final List<byte[]> inputMessages = TestUtils.readSampleData(sampleParsedPath);
+ final String cf = "cf";
+ final String trackerHBaseTableName = "tracker";
+ final String threatIntelTableName = "threat_intel";
+ final String enrichmentsTableName = "enrichments";
+ final Properties topologyProperties = new Properties() {{
+ setProperty("org.apache.metron.enrichment.host.known_hosts", "[{\"ip\":\"10.1.128.236\", \"local\":\"YES\", \"type\":\"webserver\", \"asset_value\" : \"important\"},\n" +
+ "{\"ip\":\"10.1.128.237\", \"local\":\"UNKNOWN\", \"type\":\"unknown\", \"asset_value\" : \"important\"},\n" +
+ "{\"ip\":\"10.60.10.254\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"},\n" +
+ "{\"ip\":\"10.0.2.15\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}]");
+ setProperty("hbase.provider.impl","" + Provider.class.getName());
+ setProperty("threat.intel.tracker.table", trackerHBaseTableName);
+ setProperty("threat.intel.tracker.cf", cf);
+ setProperty("threat.intel.simple.hbase.table", threatIntelTableName);
+ setProperty("threat.intel.simple.hbase.cf", cf);
+ setProperty("enrichment.simple.hbase.table", enrichmentsTableName);
+ setProperty("enrichment.simple.hbase.cf", cf);
+ setProperty("es.clustername", "metron");
+ setProperty("es.port", "9300");
+ setProperty("es.ip", "localhost");
+ setProperty("index.date.format", dateFormat);
+ setProperty("index.hdfs.output", hdfsDir);
+ }};
+ setAdditionalProperties(topologyProperties);
+ final KafkaWithZKComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaWithZKComponent.Topic>() {{
+ add(new KafkaWithZKComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
+ }});
+
+ //create MockHBaseTables
+ final MockHTable trackerTable = (MockHTable)MockHTable.Provider.addToCache(trackerHBaseTableName, cf);
+ final MockHTable threatIntelTable = (MockHTable)MockHTable.Provider.addToCache(threatIntelTableName, cf);
+ EnrichmentHelper.INSTANCE.load(threatIntelTable, cf, new ArrayList<LookupKV<EnrichmentKey, EnrichmentValue>>(){{
+ add(new LookupKV<>(new EnrichmentKey(MALICIOUS_IP_TYPE, "10.0.2.3"), new EnrichmentValue(new HashMap<String, String>())));
+ }});
+ final MockHTable enrichmentTable = (MockHTable)MockHTable.Provider.addToCache(enrichmentsTableName, cf);
+ EnrichmentHelper.INSTANCE.load(enrichmentTable, cf, new ArrayList<LookupKV<EnrichmentKey, EnrichmentValue>>(){{
+ add(new LookupKV<>(new EnrichmentKey(PLAYFUL_CLASSIFICATION_TYPE, "10.0.2.3")
+ , new EnrichmentValue(PLAYFUL_ENRICHMENT )
+ )
+ );
+ }});
+ FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
+ .withTopologyLocation(new File(fluxPath))
+ .withTopologyName("test")
+ .withTopologyProperties(topologyProperties)
+ .build();
+
+ InMemoryComponent searchComponent = getSearchComponent(topologyProperties);
+
+ UnitTestHelper.verboseLogging();
+ ComponentRunner runner = new ComponentRunner.Builder()
+ .withComponent("kafka", kafkaComponent)
+ .withComponent("search", searchComponent)
+ .withComponent("storm", fluxComponent)
+ .withMillisecondsBetweenAttempts(10000)
+ .withNumRetries(10)
+ .build();
+ runner.start();
+
+ try {
+ fluxComponent.submitTopology();
+
+ kafkaComponent.writeMessages(Constants.ENRICHMENT_TOPIC, inputMessages);
+ List<Map<String, Object>> docs = runner.process(getProcessor(inputMessages));
+ Assert.assertEquals(inputMessages.size(), docs.size());
+ List<Map<String, Object>> cleanedDocs = cleanDocs(docs);
+ validateAll(cleanedDocs);
+
+
+ List<Map<String, Object>> docsFromDisk = readDocsFromDisk(hdfsDir);
+ Assert.assertEquals(docsFromDisk.size(), docs.size()) ;
+ Assert.assertEquals(new File(hdfsDir).list().length, 1);
+ Assert.assertEquals(new File(hdfsDir).list()[0], "yaf");
+ validateAll(docsFromDisk);
+ }
+ finally {
+ cleanHdfsDir(hdfsDir);
+ runner.stop();
+ }
+ }
+
+ public List<Map<String, Object>> cleanDocs(List<Map<String, Object>> docs) {
+ List<Map<String, Object>> cleanedDocs = new ArrayList<>();
+ for(Map<String, Object> doc: docs) {
+ Map<String, Object> cleanedFields = new HashMap<>();
+ for(String field: doc.keySet()) {
+ cleanedFields.put(cleanField(field), doc.get(field));
+ }
+ cleanedDocs.add(cleanedFields);
+ }
+ return cleanedDocs;
+ }
+
+ public static void validateAll(List<Map<String, Object>> docs) {
+ for (Map<String, Object> doc : docs) {
+ baseValidation(doc);
+ hostEnrichmentValidation(doc);
+ geoEnrichmentValidation(doc);
+ threatIntelValidation(doc);
+ simpleEnrichmentValidation(doc);
+ }
+ }
+
+ public static void baseValidation(Map<String, Object> jsonDoc) {
+ assertEnrichmentsExists("threatintels.", setOf("hbaseThreatIntel"), jsonDoc.keySet());
+ assertEnrichmentsExists("enrichments.", setOf("geo", "host", "hbaseEnrichment" ), jsonDoc.keySet());
+ for(Map.Entry<String, Object> kv : jsonDoc.entrySet()) {
+ //ensure no values are empty.
+ Assert.assertTrue(kv.getValue().toString().length() > 0);
+ }
+ //ensure we always have a source ip and destination ip
+ Assert.assertNotNull(jsonDoc.get(SRC_IP));
+ Assert.assertNotNull(jsonDoc.get(DST_IP));
+ }
+
+ private static class EvaluationPayload {
+ Map<String, Object> indexedDoc;
+ String key;
+ public EvaluationPayload(Map<String, Object> indexedDoc, String key) {
+ this.indexedDoc = indexedDoc;
+ this.key = key;
+ }
+ }
+
+ private static enum HostEnrichments implements Predicate<EvaluationPayload>{
+ LOCAL_LOCATION(new Predicate<EvaluationPayload>() {
+
+ @Override
+ public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
+ return evaluationPayload.indexedDoc.get("enrichments.host." + evaluationPayload.key + ".known_info.local").equals("YES");
+ }
+ })
+ ,UNKNOWN_LOCATION(new Predicate<EvaluationPayload>() {
+
+ @Override
+ public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
+ return evaluationPayload.indexedDoc.get("enrichments.host." + evaluationPayload.key + ".known_info.local").equals("UNKNOWN");
+ }
+ })
+ ,IMPORTANT(new Predicate<EvaluationPayload>() {
+ @Override
+ public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
+ return evaluationPayload.indexedDoc.get("enrichments.host." + evaluationPayload.key + ".known_info.asset_value").equals("important");
+ }
+ })
+ ,PRINTER_TYPE(new Predicate<EvaluationPayload>() {
+ @Override
+ public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
+ return evaluationPayload.indexedDoc.get("enrichments.host." + evaluationPayload.key + ".known_info.type").equals("printer");
+ }
+ })
+ ,WEBSERVER_TYPE(new Predicate<EvaluationPayload>() {
+ @Override
+ public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
+ return evaluationPayload.indexedDoc.get("enrichments.host." + evaluationPayload.key + ".known_info.type").equals("webserver");
+ }
+ })
+ ,UNKNOWN_TYPE(new Predicate<EvaluationPayload>() {
+ @Override
+ public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
+ return evaluationPayload.indexedDoc.get("enrichments.host." + evaluationPayload.key + ".known_info.type").equals("unknown");
+ }
+ })
+ ;
+
+ Predicate<EvaluationPayload> _predicate;
+ HostEnrichments(Predicate<EvaluationPayload> predicate) {
+ this._predicate = predicate;
+ }
+
+ public boolean apply(EvaluationPayload payload) {
+ return _predicate.apply(payload);
+ }
+
+ }
+
+ private static void assertEnrichmentsExists(String topLevel, Set<String> expectedEnrichments, Set<String> keys) {
+ for(String key : keys) {
+ if(key.startsWith(topLevel)) {
+ String secondLevel = Iterables.get(Splitter.on(".").split(key), 1);
+ String message = "Found an enrichment/threat intel (" + secondLevel + ") that I didn't expect (expected enrichments :"
+ + Joiner.on(",").join(expectedEnrichments) + "), but it was not there. If you've created a new"
+ + " enrichment, then please add a validation method to this unit test. Otherwise, it's a solid error"
+ + " and should be investigated.";
+ Assert.assertTrue( message, expectedEnrichments.contains(secondLevel));
+ }
+ }
+ }
+ private static void simpleEnrichmentValidation(Map<String, Object> indexedDoc) {
+ if(indexedDoc.get(SRC_IP).equals("10.0.2.3")
+ || indexedDoc.get(DST_IP).equals("10.0.2.3")
+ ) {
+ Assert.assertTrue(keyPatternExists("enrichments.hbaseEnrichment", indexedDoc));
+ if(indexedDoc.get(SRC_IP).equals("10.0.2.3")) {
+ Assert.assertEquals(indexedDoc.get("enrichments.hbaseEnrichment." + SRC_IP + "." + PLAYFUL_CLASSIFICATION_TYPE+ ".orientation")
+ , PLAYFUL_ENRICHMENT.get("orientation")
+ );
+ }
+ else if(indexedDoc.get(DST_IP).equals("10.0.2.3")) {
+ Assert.assertEquals( indexedDoc.get("enrichments.hbaseEnrichment." + DST_IP + "." + PLAYFUL_CLASSIFICATION_TYPE + ".orientation")
+ , PLAYFUL_ENRICHMENT.get("orientation")
+ );
+ }
+ }
+
+ }
+ private static void threatIntelValidation(Map<String, Object> indexedDoc) {
+ if(indexedDoc.get(SRC_IP).equals("10.0.2.3")
+ || indexedDoc.get(DST_IP).equals("10.0.2.3")
+ ) {
+ //if we have any threat intel messages, we want to tag is_alert to true
+ Assert.assertTrue(keyPatternExists("threatintels.", indexedDoc));
+ Assert.assertEquals(indexedDoc.get("is_alert"), "true");
+ }
+ else {
+ //For YAF this is the case, but if we do snort later on, this will be invalid.
+ Assert.assertNull(indexedDoc.get("is_alert"));
+ Assert.assertFalse(keyPatternExists("threatintels.", indexedDoc));
+ }
+ //ip threat intels
+ if(keyPatternExists("threatintels.hbaseThreatIntel.", indexedDoc)) {
+ if(indexedDoc.get(SRC_IP).equals("10.0.2.3")) {
+ Assert.assertEquals(indexedDoc.get("threatintels.hbaseThreatIntel." + SRC_IP + "." + MALICIOUS_IP_TYPE), "alert");
+ }
+ else if(indexedDoc.get(DST_IP).equals("10.0.2.3")) {
+ Assert.assertEquals(indexedDoc.get("threatintels.hbaseThreatIntel." + DST_IP + "." + MALICIOUS_IP_TYPE), "alert");
+ }
+ else {
+ Assert.fail("There was a threat intels that I did not expect: " + indexedDoc);
+ }
+ }
+
+ }
+
+ private static void geoEnrichmentValidation(Map<String, Object> indexedDoc) {
+ //should have geo enrichment on every message due to mock geo adapter
+ Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".location_point"), MockGeoAdapter.DEFAULT_LOCATION_POINT);
+ Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP +".location_point"), MockGeoAdapter.DEFAULT_LOCATION_POINT);
+ Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".longitude"), MockGeoAdapter.DEFAULT_LONGITUDE);
+ Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".longitude"), MockGeoAdapter.DEFAULT_LONGITUDE);
+ Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".city"), MockGeoAdapter.DEFAULT_CITY);
+ Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".city"), MockGeoAdapter.DEFAULT_CITY);
+ Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".latitude"), MockGeoAdapter.DEFAULT_LATITUDE);
+ Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".latitude"), MockGeoAdapter.DEFAULT_LATITUDE);
+ Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".country"), MockGeoAdapter.DEFAULT_COUNTRY);
+ Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".country"), MockGeoAdapter.DEFAULT_COUNTRY);
+ Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".dmaCode"), MockGeoAdapter.DEFAULT_DMACODE);
+ Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".dmaCode"), MockGeoAdapter.DEFAULT_DMACODE);
+ Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".postalCode"), MockGeoAdapter.DEFAULT_POSTAL_CODE);
+ Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".postalCode"), MockGeoAdapter.DEFAULT_POSTAL_CODE);
+ }
+
+ private static void hostEnrichmentValidation(Map<String, Object> indexedDoc) {
+ boolean enriched = false;
+ //important local printers
+ {
+ Set<String> ips = setOf("10.0.2.15", "10.60.10.254");
+ if (ips.contains(indexedDoc.get(SRC_IP))) {
+ //this is a local, important, printer
+ Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
+ ,HostEnrichments.IMPORTANT
+ ,HostEnrichments.PRINTER_TYPE
+ ).apply(new EvaluationPayload(indexedDoc, SRC_IP))
+ );
+ enriched = true;
+ }
+ if (ips.contains(indexedDoc.get(DST_IP))) {
+ Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
+ ,HostEnrichments.IMPORTANT
+ ,HostEnrichments.PRINTER_TYPE
+ ).apply(new EvaluationPayload(indexedDoc, DST_IP))
+ );
+ enriched = true;
+ }
+ }
+ //important local webservers
+ {
+ Set<String> ips = setOf("10.1.128.236");
+ if (ips.contains(indexedDoc.get(SRC_IP))) {
+ //this is a local, important, printer
+ Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
+ ,HostEnrichments.IMPORTANT
+ ,HostEnrichments.WEBSERVER_TYPE
+ ).apply(new EvaluationPayload(indexedDoc, SRC_IP))
+ );
+ enriched = true;
+ }
+ if (ips.contains(indexedDoc.get(DST_IP))) {
+ Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
+ ,HostEnrichments.IMPORTANT
+ ,HostEnrichments.WEBSERVER_TYPE
+ ).apply(new EvaluationPayload(indexedDoc, DST_IP))
+ );
+ enriched = true;
+ }
+ }
+ if(!enriched) {
+ Assert.assertFalse(keyPatternExists("enrichments.host", indexedDoc));
+ }
+ }
+
+
+ private static boolean keyPatternExists(String pattern, Map<String, Object> indexedObj) {
+ for(String k : indexedObj.keySet()) {
+ if(k.startsWith(pattern)) {
+ return true;
+ }
+ }
+ return false;
+ }
+ private static Set<String> setOf(String... items) {
+ Set<String> ret = new HashSet<>();
+ for(String item : items) {
+ ret.add(item);
+ }
+ return ret;
+ }
+
+ abstract public InMemoryComponent getSearchComponent(Properties topologyProperties) throws Exception;
+ abstract public Processor<List<Map<String, Object>>> getProcessor(List<byte[]> inputMessages);
+ abstract public void setAdditionalProperties(Properties topologyProperties);
+ abstract public String cleanField(String field);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/InMemoryComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/InMemoryComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/InMemoryComponent.java
new file mode 100644
index 0000000..8a9ee96
--- /dev/null
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/InMemoryComponent.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration;
+
+public interface InMemoryComponent {
+ public void start() throws UnableToStartException;
+ public void stop();
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/Processor.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/Processor.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/Processor.java
new file mode 100644
index 0000000..bbcfb73
--- /dev/null
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/Processor.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration;
+
+public interface Processor<T> {
+ ReadinessState process(ComponentRunner runner);
+ T getResult();
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ReadinessState.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ReadinessState.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ReadinessState.java
new file mode 100644
index 0000000..5cdfbb4
--- /dev/null
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ReadinessState.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration;
+
+public enum ReadinessState {
+ READY, NOT_READY;
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/UnableToStartException.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/UnableToStartException.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/UnableToStartException.java
new file mode 100644
index 0000000..0fcda14
--- /dev/null
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/UnableToStartException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration;
+
+public class UnableToStartException extends Exception {
+ public UnableToStartException(String message) {
+ super(message);
+ }
+ public UnableToStartException(String message, Throwable t) {
+ super(message, t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
new file mode 100644
index 0000000..3bb0c56
--- /dev/null
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.components;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.StormTopology;
+import org.apache.metron.integration.InMemoryComponent;
+import org.apache.metron.integration.UnableToStartException;
+import org.apache.storm.flux.FluxBuilder;
+import org.apache.storm.flux.model.ExecutionContext;
+import org.apache.storm.flux.model.TopologyDef;
+import org.apache.storm.flux.parser.FluxParser;
+import org.apache.thrift7.TException;
+import org.junit.Assert;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Properties;
+
+public class FluxTopologyComponent implements InMemoryComponent {
+ LocalCluster stormCluster;
+ String topologyName;
+ File topologyLocation;
+ Properties topologyProperties;
+
+ public static class Builder {
+ String topologyName;
+ File topologyLocation;
+ Properties topologyProperties;
+ public Builder withTopologyName(String name) {
+ this.topologyName = name;
+ return this;
+ }
+ public Builder withTopologyLocation(File location) {
+ this.topologyLocation = location;
+ return this;
+ }
+ public Builder withTopologyProperties(Properties properties) {
+ this.topologyProperties = properties;
+ return this;
+ }
+
+ public FluxTopologyComponent build() {
+ return new FluxTopologyComponent(topologyName, topologyLocation, topologyProperties);
+ }
+ }
+
+ public FluxTopologyComponent(String topologyName, File topologyLocation, Properties topologyProperties) {
+ this.topologyName = topologyName;
+ this.topologyLocation = topologyLocation;
+ this.topologyProperties = topologyProperties;
+ }
+
+ public LocalCluster getStormCluster() {
+ return stormCluster;
+ }
+
+ public String getTopologyName() {
+ return topologyName;
+ }
+
+ public File getTopologyLocation() {
+ return topologyLocation;
+ }
+
+ public Properties getTopologyProperties() {
+ return topologyProperties;
+ }
+
+ public void start() throws UnableToStartException{
+ try {
+ stormCluster = new LocalCluster();
+ } catch (Exception e) {
+ throw new UnableToStartException("Unable to start flux topology: " + getTopologyLocation(), e);
+ }
+ }
+
+ public void stop() {
+ stormCluster.shutdown();
+ }
+
+ public void submitTopology() throws NoSuchMethodException, IOException, InstantiationException, TException, IllegalAccessException, InvocationTargetException, ClassNotFoundException {
+ startTopology(getTopologyName(), getTopologyLocation(), getTopologyProperties());
+ }
+ private void startTopology(String topologyName, File topologyLoc, Properties properties) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, TException {
+ TopologyDef topologyDef = loadYaml(topologyName, topologyLoc, properties);
+ Config conf = FluxBuilder.buildConfig(topologyDef);
+ ExecutionContext context = new ExecutionContext(topologyDef, conf);
+ StormTopology topology = FluxBuilder.buildTopology(context);
+ Assert.assertNotNull(topology);
+ topology.validate();
+ stormCluster.submitTopology(topologyName, conf, topology);
+ }
+
+ private static TopologyDef loadYaml(String topologyName, File yamlFile, Properties properties) throws IOException {
+ File tmpFile = File.createTempFile(topologyName, "props");
+ tmpFile.deleteOnExit();
+ FileWriter propWriter = null;
+ try {
+ propWriter = new FileWriter(tmpFile);
+ properties.store(propWriter, topologyName + " properties");
+ }
+ finally {
+ if(propWriter != null) {
+ propWriter.close();
+ return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, tmpFile.getAbsolutePath(), false);
+ }
+
+ return null;
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
new file mode 100644
index 0000000..fb7bcde
--- /dev/null
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.components;
+
+
+import com.google.common.base.Function;
+import kafka.admin.AdminUtils;
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.message.MessageAndOffset;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.*;
+import kafka.zk.EmbeddedZookeeper;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.metron.integration.InMemoryComponent;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+
+public class KafkaWithZKComponent implements InMemoryComponent {
+
+
+ public static class Topic {
+ public int numPartitions;
+ public String name;
+
+ public Topic(String name, int numPartitions) {
+ this.numPartitions = numPartitions;
+ this.name = name;
+ }
+ }
+ private transient KafkaServer kafkaServer;
+ private transient EmbeddedZookeeper zkServer;
+ private transient ZkClient zkClient;
+ private transient ConsumerConnector consumer;
+ private String zookeeperConnectString;
+ private int brokerPort = 6667;
+ private List<Topic> topics = Collections.emptyList();
+ private Function<KafkaWithZKComponent, Void> postStartCallback;
+
+ public KafkaWithZKComponent withPostStartCallback(Function<KafkaWithZKComponent, Void> f) {
+ postStartCallback = f;
+ return this;
+ }
+
+ public KafkaWithZKComponent withExistingZookeeper(String zookeeperConnectString) {
+ this.zookeeperConnectString = zookeeperConnectString;
+ return this;
+ }
+
+ public KafkaWithZKComponent withBrokerPort(int brokerPort) {
+ if(brokerPort <= 0)
+ {
+ brokerPort = TestUtils.choosePort();
+ }
+ this.brokerPort = brokerPort;
+ return this;
+ }
+
+ public KafkaWithZKComponent withTopics(List<Topic> topics) {
+ this.topics = topics;
+ return this;
+ }
+
+ public List<Topic> getTopics() {
+ return topics;
+ }
+
+ public int getBrokerPort() {
+ return brokerPort;
+ }
+
+
+ public String getBrokerList() {
+ return "localhost:" + brokerPort;
+ }
+
+ public KafkaProducer<String, byte[]> createProducer()
+ {
+ return createProducer(new HashMap<String, Object>());
+ }
+
+ public KafkaProducer<String, byte[]> createProducer(Map<String, Object> properties)
+ {
+ Map<String, Object> producerConfig = new HashMap<>();
+ producerConfig.put("bootstrap.servers", getBrokerList());
+ producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+ producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+ producerConfig.put("request.required.acks", "-1");
+ producerConfig.put("fetch.message.max.bytes", ""+ 1024*1024*10);
+ producerConfig.put("replica.fetch.max.bytes", "" + 1024*1024*10);
+ producerConfig.put("message.max.bytes", "" + 1024*1024*10);
+ producerConfig.put("message.send.max.retries", "10");
+ producerConfig.putAll(properties);
+ return new KafkaProducer<>(producerConfig);
+ }
+
+ @Override
+ public void start() {
+ // setup Zookeeper
+ if(zookeeperConnectString == null) {
+ String zkConnect = TestZKUtils.zookeeperConnect();
+ zkServer = new EmbeddedZookeeper(zkConnect);
+ zookeeperConnectString = zkServer.connectString();
+ }
+ zkClient = new ZkClient(zookeeperConnectString, 30000, 30000, ZKStringSerializer$.MODULE$);
+
+ // setup Broker
+ Properties props = TestUtils.createBrokerConfig(0, brokerPort, true);
+ KafkaConfig config = new KafkaConfig(props);
+ Time mock = new MockTime();
+ kafkaServer = TestUtils.createServer(config, mock);
+ for(Topic topic : getTopics()) {
+ try {
+ createTopic(topic.name, topic.numPartitions, true);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Unable to create topic", e);
+ }
+ }
+ postStartCallback.apply(this);
+ }
+
+ public String getZookeeperConnect() {
+ return zookeeperConnectString;
+ }
+
+ @Override
+ public void stop() {
+ kafkaServer.shutdown();
+ zkClient.close();
+ if(zkServer != null) {
+ zkServer.shutdown();
+ }
+
+ }
+
+ public List<byte[]> readMessages(String topic) {
+ SimpleConsumer consumer = new SimpleConsumer("localhost", 6667, 100000, 64 * 1024, "consumer");
+ FetchRequest req = new FetchRequestBuilder()
+ .clientId("consumer")
+ .addFetch(topic, 0, 0, 100000)
+ .build();
+ FetchResponse fetchResponse = consumer.fetch(req);
+ Iterator<MessageAndOffset> results = fetchResponse.messageSet(topic, 0).iterator();
+ List<byte[]> messages = new ArrayList<>();
+ while(results.hasNext()) {
+ ByteBuffer payload = results.next().message().payload();
+ byte[] bytes = new byte[payload.limit()];
+ payload.get(bytes);
+ messages.add(bytes);
+ }
+ return messages;
+ }
+
+ public ConsumerIterator<byte[], byte[]> getStreamIterator(String topic) {
+ return getStreamIterator(topic, "group0", "consumer0");
+ }
+ public ConsumerIterator<byte[], byte[]> getStreamIterator(String topic, String group, String consumerName) {
+ // setup simple consumer
+ Properties consumerProperties = TestUtils.createConsumerProperties(zkServer.connectString(), group, consumerName, -1);
+ consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProperties));
+ Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
+ topicCountMap.put(topic, 1);
+ Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
+ KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
+ ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
+ return iterator;
+ }
+
+ public void shutdownConsumer() {
+ consumer.shutdown();
+ }
+
+ public void createTopic(String name) throws InterruptedException {
+ createTopic(name, 1, true);
+ }
+
+ public void waitUntilMetadataIsPropagated(String topic, int numPartitions) {
+ List<KafkaServer> servers = new ArrayList<>();
+ servers.add(kafkaServer);
+ for(int part = 0;part < numPartitions;++part) {
+ TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, part, 5000);
+ }
+ }
+
+ public void createTopic(String name, int numPartitions, boolean waitUntilMetadataIsPropagated) throws InterruptedException {
+ AdminUtils.createTopic(zkClient, name, numPartitions, 1, new Properties());
+ if(waitUntilMetadataIsPropagated) {
+ waitUntilMetadataIsPropagated(name, numPartitions);
+ }
+ }
+
+ public void writeMessages(String topic, Collection<byte[]> messages) {
+ KafkaProducer<String, byte[]> kafkaProducer = createProducer();
+ for(byte[] message: messages) {
+ kafkaProducer.send(new ProducerRecord<String, byte[]>(topic, message));
+ }
+ kafkaProducer.close();
+ }
+}
\ No newline at end of file