You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/04/05 21:42:09 UTC
[13/15] incubator-metron git commit: METRON 86: Adding Solr indexing
support (merrimanr via cestella) closes apache/incubator-metron#67
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/TelemetryIndexingBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/TelemetryIndexingBolt.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/TelemetryIndexingBolt.java
deleted file mode 100644
index 21ecb18..0000000
--- a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/TelemetryIndexingBolt.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.indexing;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.configuration.Configuration;
-import org.json.simple.JSONObject;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import org.apache.metron.helpers.topology.ErrorUtils;
-import org.apache.metron.index.interfaces.IndexAdapter;
-import org.apache.metron.json.serialization.JSONEncoderHelper;
-import org.apache.metron.metrics.MetricReporter;
-
-/**
- *
- * Bolt for indexing telemetry messages into Elastic Search, Solr, Druid, etc...
- * For a list of all adapters provided please see org.apache.metron.indexing.adapters
- *
- * As of release of this code the following adapters for indexing are provided:
- * <p>
- * <ul>
- *
- * <li>ESBulkAdapter = adapter that can bulk index messages into ES
- * <li>ESBulkRotatingAdapter = adapter that can bulk index messages into ES,
- * rotate the index, and apply an alias to the rotated index
- * <ul>
- * <p>
- *
- */
-
-@SuppressWarnings("serial")
-public class TelemetryIndexingBolt extends AbstractIndexingBolt {
-
- private JSONObject metricConfiguration;
- private String _indexDateFormat;
-
- private Set<Tuple> tuple_queue = new HashSet<Tuple>();
-
- public TelemetryIndexingBolt(String zookeeperUrl) {
- super(zookeeperUrl);
- }
-
- /**
- *
- * @param IndexIP
- * ip of ElasticSearch/Solr/etc...
- * @return instance of bolt
- */
- public TelemetryIndexingBolt withIndexIP(String IndexIP) {
- _IndexIP = IndexIP;
- return this;
- }
-
- /**
- *
- * @param IndexPort
- * port of ElasticSearch/Solr/etc...
- * @return instance of bolt
- */
-
- public TelemetryIndexingBolt withIndexPort(int IndexPort) {
- _IndexPort = IndexPort;
- return this;
- }
-
- /**
- *
- * @param IndexName
- * name of the index in ElasticSearch/Solr/etc...
- * @return instance of bolt
- */
- public TelemetryIndexingBolt withIndexName(String IndexName) {
- _IndexName = IndexName;
- return this;
- }
-
- /**
- *
- * @param ClusterName
- * name of cluster to index into in ElasticSearch/Solr/etc...
- * @return instance of bolt
- */
- public TelemetryIndexingBolt withClusterName(String ClusterName) {
- _ClusterName = ClusterName;
- return this;
- }
-
- /**
- *
- * @param DocumentName
- * name of document to be indexed in ElasticSearch/Solr/etc...
- * @return
- */
-
- public TelemetryIndexingBolt withDocumentName(String DocumentName) {
- _DocumentName = DocumentName;
- return this;
- }
-
- /**
- *
- * @param BulkIndexNumber
- * number of documents to bulk index together
- * @return instance of bolt
- */
- public TelemetryIndexingBolt withBulk(int BulkIndexNumber) {
- _BulkIndexNumber = BulkIndexNumber;
- return this;
- }
-
- /**
- *
- * @param adapter
- * adapter that handles indexing of JSON strings
- * @return instance of bolt
- */
- public TelemetryIndexingBolt withIndexAdapter(IndexAdapter adapter) {
- _adapter = adapter;
-
- return this;
- }
-
- /**
- *
- * @param indexTimestamp
- * timestamp to append to index names
- * @return instance of bolt
- */
- public TelemetryIndexingBolt withIndexTimestamp(String indexTimestamp) {
- _indexDateFormat = indexTimestamp;
-
- return this;
- }
- /**
- *
- * @param config
- * - configuration for pushing metrics into graphite
- * @return instance of bolt
- */
- public TelemetryIndexingBolt withMetricConfiguration(Configuration config) {
- this.metricConfiguration = JSONEncoderHelper.getJSON(config
- .subset("org.apache.metron.metrics"));
- return this;
- }
-
- @SuppressWarnings("rawtypes")
- @Override
- void doPrepare(Map conf, TopologyContext topologyContext,
- OutputCollector collector) throws IOException {
-
- try {
-
- _adapter.initializeConnection(_IndexIP, _IndexPort,
- _ClusterName, _IndexName, _DocumentName, _BulkIndexNumber, _indexDateFormat);
-
- _reporter = new MetricReporter();
- _reporter.initialize(metricConfiguration,
- TelemetryIndexingBolt.class);
- this.registerCounters();
- } catch (Exception e) {
-
- e.printStackTrace();
-
- JSONObject error = ErrorUtils.generateErrorMessage(new String("bulk index problem"), e);
- _collector.emit("error", new Values(error));
- }
-
- }
-
- public void execute(Tuple tuple) {
-
- JSONObject message = null;
-
- try {
- LOG.trace("[Metron] Indexing bolt gets: " + message);
-
- message = (JSONObject) tuple.getValueByField("message");
-
- if (message == null || message.isEmpty())
- throw new Exception(
- "Could not parse message from binary stream");
-
- int result_code = _adapter.bulkIndex(message);
-
- if (result_code == 0) {
- tuple_queue.add(tuple);
- } else if (result_code == 1) {
- tuple_queue.add(tuple);
-
- Iterator<Tuple> iterator = tuple_queue.iterator();
- while(iterator.hasNext())
- {
- Tuple setElement = iterator.next();
- _collector.ack(setElement);
- ackCounter.inc();
- }
- tuple_queue.clear();
- } else if (result_code == 2) {
- throw new Exception("Failed to index elements with client");
- }
-
- } catch (Exception e) {
- e.printStackTrace();
-
-
- Iterator<Tuple> iterator = tuple_queue.iterator();
- while(iterator.hasNext())
- {
- Tuple setElement = iterator.next();
- _collector.fail(setElement);
- failCounter.inc();
-
-
- JSONObject error = ErrorUtils.generateErrorMessage(new String("bulk index problem"), e);
- _collector.emit("error", new Values(error));
- }
- tuple_queue.clear();
-
-
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declearer) {
- declearer.declareStream("error", new Fields("Index"));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/AbstractIndexAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/AbstractIndexAdapter.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/AbstractIndexAdapter.java
deleted file mode 100644
index 58f5bed..0000000
--- a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/AbstractIndexAdapter.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.indexing.adapters;
-
-import java.io.Serializable;
-
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.metron.index.interfaces.IndexAdapter;
-import org.apache.metron.indexing.AbstractIndexingBolt;
-
-@SuppressWarnings("serial")
-public abstract class AbstractIndexAdapter implements IndexAdapter, Serializable{
-
- protected static final Logger _LOG = LoggerFactory
- .getLogger(AbstractIndexingBolt.class);
-
-
-
-
- abstract public boolean initializeConnection(String ip, int port,
- String cluster_name, String index_name, String document_name,
- int bulk, String date_format) throws Exception;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/ESBaseBulkAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/ESBaseBulkAdapter.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/ESBaseBulkAdapter.java
deleted file mode 100644
index 5e64b86..0000000
--- a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/ESBaseBulkAdapter.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.indexing.adapters;
-
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.commons.collections.Bag;
-import org.apache.commons.collections.bag.HashBag;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.json.simple.JSONObject;
-
-@SuppressWarnings("serial")
-public class ESBaseBulkAdapter extends AbstractIndexAdapter implements
- Serializable {
-
- private int _bulk_size;
- private String _index_name;
- private String _document_name;
- private String _cluster_name;
- private int _port;
- private String _ip;
- public transient TransportClient client;
-
- private Bag bulk_set;
-
- private Settings settings;
-
- @Override
- public boolean initializeConnection(String ip, int port,
- String cluster_name, String index_name, String document_name,
- int bulk_size, String date_format) throws Exception {
-
- bulk_set = new HashBag();
-
- _LOG.trace("[Metron] Initializing ESBulkAdapter...");
-
- try {
- _ip = ip;
- _port = port;
- _cluster_name = cluster_name;
- _index_name = index_name;
- _document_name = document_name;
- _bulk_size = bulk_size;
-
- _LOG.trace("[Metron] Bulk indexing is set to: " + _bulk_size);
-
- settings = ImmutableSettings.settingsBuilder()
- .put("cluster.name", _cluster_name).build();
- client = new TransportClient(settings)
- .addTransportAddress(new InetSocketTransportAddress(_ip,
- _port));
-
- return true;
- } catch (Exception e) {
- e.printStackTrace();
- return false;
- }
- }
-
- /**
- * @param raw_message
- * message to bulk index in Elastic Search
- * @return integer (0) loaded into a bulk queue, (1) bulk indexing executed,
- * (2) error
- */
- @SuppressWarnings("unchecked")
- public int bulkIndex(JSONObject raw_message) {
-
- boolean success = true;
- int set_size = 0;
-
- synchronized (bulk_set) {
- bulk_set.add(raw_message);
- set_size = bulk_set.size();
-
- _LOG.trace("[Metron] Bulk size is now: " + bulk_set.size());
- }
-
- try {
-
- if (set_size >= _bulk_size) {
- success = doIndex();
-
- if (success)
- return 1;
- else
- return 2;
- }
-
- return 0;
-
- } catch (Exception e) {
- e.printStackTrace();
- return 2;
- }
- }
-
- public boolean doIndex() throws Exception {
-
- try {
-
- synchronized (bulk_set) {
- if (client == null)
- throw new Exception("client is null");
-
- BulkRequestBuilder bulkRequest = client.prepareBulk();
-
- Iterator<JSONObject> iterator = bulk_set.iterator();
-
- while (iterator.hasNext()) {
- JSONObject setElement = iterator.next();
-
- IndexRequestBuilder a = client.prepareIndex(_index_name,
- _document_name);
- a.setSource(setElement.toString());
- bulkRequest.add(a);
-
- }
-
- _LOG.trace("[Metron] Performing bulk load of size: "
- + bulkRequest.numberOfActions());
-
- BulkResponse resp = bulkRequest.execute().actionGet();
- _LOG.trace("[Metron] Received bulk response: "
- + resp.toString());
- bulk_set.clear();
- }
-
- return true;
- }
-
- catch (Exception e) {
- e.printStackTrace();
- return false;
- }
- }
-
- public void setOptionalSettings(Map<String, String> settings) {
- // TODO Auto-generated method stub
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/ESBulkRotatingAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/ESBulkRotatingAdapter.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/ESBulkRotatingAdapter.java
deleted file mode 100644
index 1f8c50e..0000000
--- a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/ESBulkRotatingAdapter.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.indexing.adapters;
-
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Map;
-
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.util.EntityUtils;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.json.simple.JSONObject;
-
-@SuppressWarnings({ "deprecation", "serial" })
-public class ESBulkRotatingAdapter extends AbstractIndexAdapter {
-
- private Client client;
- private BulkRequestBuilder bulkRequest;
- private int _bulk_size;
- private String _index_name;
- private String _document_name;
- private int element_count;
- private String index_postfix;
- private String running_index_postfix;
-
- private HttpClient httpclient;
- private HttpPost post;
-
- private DateFormat dateFormat;
-
- public boolean initializeConnection(String ip, int port,
- String cluster_name, String index_name, String document_name,
- int bulk_size, String date_format) {
-
- _LOG.info("Initializing ESBulkAdapter...");
-
- try {
- httpclient = new DefaultHttpClient();
- String alias_link = "http://" + ip + ":" + 9200 + "/_aliases";
- post = new HttpPost(alias_link);
-
- _index_name = index_name;
- _document_name = document_name;
-
- _bulk_size = bulk_size - 1;
-
-
- dateFormat = new SimpleDateFormat(date_format);
-
- element_count = 0;
- running_index_postfix = "NONE";
-
- Settings settings = ImmutableSettings.settingsBuilder()
- .put("cluster.name", cluster_name).build();
- client = new TransportClient(settings)
- .addTransportAddress(new InetSocketTransportAddress(ip,
- port));
-
- bulkRequest = client.prepareBulk();
-
- return true;
- } catch (Exception e) {
- e.printStackTrace();
- return false;
- }
- }
-
- @SuppressWarnings("unchecked")
- public int bulkIndex(JSONObject raw_message) {
-
- index_postfix = dateFormat.format(new Date());
-
- bulkRequest.add(client.prepareIndex(_index_name + "_" + index_postfix,
- _document_name).setSource(raw_message));
-
- return doIndex();
- }
-
- public int bulkIndex(String raw_message) {
-
- index_postfix = dateFormat.format(new Date());
-
- bulkRequest.add(client.prepareIndex(_index_name + "_" + index_postfix,
- _document_name).setSource(raw_message));
-
- return doIndex();
- }
-
- public int doIndex() {
-
- element_count++;
-
- if (element_count != _bulk_size)
- return 0;
-
- if (element_count == _bulk_size) {
- _LOG.debug("Starting bulk load of size: " + _bulk_size);
- BulkResponse resp = bulkRequest.execute().actionGet();
- element_count = 0;
- _LOG.debug("Received bulk response: " + resp.toString());
-
- if (resp.hasFailures()) {
- _LOG.error("Bulk update failed");
- return 2;
- }
-
- if (!running_index_postfix.equals(index_postfix)) {
-
- _LOG.debug("Attempting to apply a new alias");
-
- try {
-
- String alias = "{\"actions\" : [{ \"add\" : { \"index\" : \""
- + _index_name
- + "-"
- + index_postfix
- + "\", \"alias\" : \"" + _index_name + "\" } } ]}";
-
- post.setEntity(new StringEntity(alias));
-
- HttpResponse response = httpclient.execute(post);
- String res = EntityUtils.toString(response.getEntity());
-
- _LOG.debug("Alias request received the following response: "
- + res);
-
- running_index_postfix = index_postfix;
- }
-
- catch (Exception e) {
- e.printStackTrace();
- _LOG.error("Alias request failed...");
- return 2;
- }
- }
-
- index_postfix = dateFormat.format(new Date());
- }
-
- _LOG.debug("Adding to bulk load: element " + element_count
- + " of bulk size " + _bulk_size);
-
- return 1;
- }
-
- public void setOptionalSettings(Map<String, String> settings) {
- // TODO Auto-generated method stub
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/ESTimedRotatingAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/ESTimedRotatingAdapter.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/ESTimedRotatingAdapter.java
deleted file mode 100644
index fd4c067..0000000
--- a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/ESTimedRotatingAdapter.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.indexing.adapters;
-
-import java.io.Serializable;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.commons.collections.Bag;
-import org.apache.commons.collections.HashBag;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.json.simple.JSONObject;
-
-@SuppressWarnings("serial")
-public class ESTimedRotatingAdapter extends AbstractIndexAdapter implements
- Serializable {
-
- private int _bulk_size;
- private String _index_name;
- private String _document_name;
- private String _cluster_name;
- private int _port;
- private String _ip;
- public transient TransportClient client;
- private DateFormat dateFormat;
-
- private Map<String, String> tuning_settings;
-
- private Bag bulk_set;
-
- private Settings settings;
-
- public void setOptionalSettings(Map<String, String> settings)
- {
- tuning_settings = settings;
- }
-
- @Override
- public boolean initializeConnection(String ip, int port,
- String cluster_name, String index_name, String document_name,
- int bulk_size, String date_format) throws Exception {
-
- bulk_set = new HashBag();
-
- _LOG.trace("[Metron] Initializing ESBulkAdapter...");
-
- try {
- _ip = ip;
- _port = port;
- _cluster_name = cluster_name;
- _index_name = index_name;
- _document_name = document_name;
- _bulk_size = bulk_size;
-
-
- dateFormat = new SimpleDateFormat(date_format);
-
- System.out.println("Bulk indexing is set to: " + _bulk_size);
-
- ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder() ;
-
- if(tuning_settings != null && tuning_settings.size() > 0)
- {
- builder.put(tuning_settings);
- }
-
- builder.put("cluster.name", _cluster_name);
- builder.put("client.transport.ping_timeout","500s");
-
-
- settings = builder.build();
-
- client = new TransportClient(settings)
- .addTransportAddress(new InetSocketTransportAddress(_ip,
- _port));
-
- return true;
- } catch (Exception e) {
- e.printStackTrace();
- return false;
- }
- }
-
- /**
- * @param raw_message
- * message to bulk index in Elastic Search
- * @return integer (0) loaded into a bulk queue, (1) bulk indexing executed,
- * (2) error
- */
- @SuppressWarnings("unchecked")
- public int bulkIndex(JSONObject raw_message) {
-
- boolean success = true;
- int set_size = 0;
-
- synchronized (bulk_set) {
- bulk_set.add(raw_message);
- set_size = bulk_set.size();
-
- _LOG.trace("[Metron] Incremented bulk size to: " + bulk_set.size());
- }
-
- try {
-
- if (set_size >= _bulk_size) {
- success = doIndex();
-
- if (success)
- return 1;
- else
- return 2;
- }
-
- return 0;
-
- } catch (Exception e) {
- e.printStackTrace();
- return 2;
- }
- }
-
- public boolean doIndex() throws Exception {
-
- try {
-
- synchronized (bulk_set) {
- if (client == null)
- throw new Exception("client is null");
-
- BulkRequestBuilder bulkRequest = client.prepareBulk();
-
- Iterator<JSONObject> iterator = bulk_set.iterator();
-
- String index_postfix = dateFormat.format(new Date());
-
- while (iterator.hasNext()) {
- JSONObject setElement = iterator.next();
-
- _LOG.trace("[Metron] Flushing to index: " + _index_name+ "_" + index_postfix);
-
- IndexRequestBuilder a = client.prepareIndex(_index_name+ "_" + index_postfix,
- _document_name);
- a.setSource(setElement.toString());
- bulkRequest.add(a);
-
- }
-
- _LOG.trace("[Metron] Performing bulk load of size: "
- + bulkRequest.numberOfActions());
-
- BulkResponse resp = bulkRequest.execute().actionGet();
-
- for(BulkItemResponse r: resp.getItems())
- {
- r.getResponse();
- _LOG.trace("[Metron] ES SUCCESS MESSAGE: " + r.getFailureMessage());
- }
-
-
- bulk_set.clear();
-
- if (resp.hasFailures()) {
- _LOG.error("[Metron] Received bulk response error: "
- + resp.buildFailureMessage());
-
- for(BulkItemResponse r: resp.getItems())
- {
- r.getResponse();
- _LOG.error("[Metron] ES FAILURE MESSAGE: " + r.getFailureMessage());
- }
- }
-
- }
-
- return true;
- }
-
- catch (Exception e) {
- e.printStackTrace();
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/SolrAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/SolrAdapter.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/SolrAdapter.java
deleted file mode 100644
index 13f02f4..0000000
--- a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/SolrAdapter.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.indexing.adapters;
-
-public class SolrAdapter {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
deleted file mode 100644
index e8d654d..0000000
--- a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.writer;
-
-import backtype.storm.tuple.Tuple;
-import org.apache.metron.domain.SourceConfig;
-import org.apache.metron.writer.interfaces.BulkMessageWriter;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-
-public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Serializable {
-
- private String clusterName;
- private Map<String, String> optionalSettings;
- private transient TransportClient client;
- private String host;
- private int port;
- private SimpleDateFormat dateFormat;
- private static final Logger LOG = LoggerFactory
- .getLogger(ElasticsearchWriter.class);
-
- public ElasticsearchWriter(String clusterName, String host, int port, String dateFormat) {
- this.clusterName = clusterName;
- this.host = host;
- this.port = port;
- this.dateFormat = new SimpleDateFormat(dateFormat);
- }
-
- public ElasticsearchWriter withOptionalSettings(Map<String, String> optionalSettings) {
- this.optionalSettings = optionalSettings;
- return this;
- }
-
- @Override
- public void init(Map stormConf) {
- ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder();
- builder.put("cluster.name", clusterName);
- builder.put("client.transport.ping_timeout","500s");
- if (optionalSettings != null) {
- builder.put(optionalSettings);
- }
- client = new TransportClient(builder.build())
- .addTransportAddress(new InetSocketTransportAddress(host, port))
- ;
-
- }
-
- @Override
- public void write(String sourceType, SourceConfig configuration, List<Tuple> tuples, List<JSONObject> messages) throws Exception {
- String indexPostfix = dateFormat.format(new Date());
- BulkRequestBuilder bulkRequest = client.prepareBulk();
- for(JSONObject message: messages) {
- String indexName = sourceType;
- if (configuration != null) {
- indexName = configuration.getIndex();
- }
- IndexRequestBuilder indexRequestBuilder = client.prepareIndex(indexName + "_index_" + indexPostfix,
- sourceType);
-
- indexRequestBuilder.setSource(message.toJSONString());
- bulkRequest.add(indexRequestBuilder);
- }
- BulkResponse resp = bulkRequest.execute().actionGet();
- if (resp.hasFailures()) {
- throw new Exception(resp.buildFailureMessage());
- }
- }
-
- @Override
- public void close() throws Exception {
- client.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
index d2cc827..591f9e3 100644
--- a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
+++ b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
@@ -18,7 +18,7 @@
package org.apache.metron.writer.hdfs;
import backtype.storm.tuple.Tuple;
-import org.apache.metron.domain.SourceConfig;
+import org.apache.metron.domain.Configurations;
import org.apache.metron.writer.interfaces.BulkMessageWriter;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
@@ -62,13 +62,13 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable {
}
@Override
- public void init(Map stormConfig) {
+ public void init(Map stormConfig, Configurations configurations) {
this.stormConfig = stormConfig;
}
@Override
public void write( String sourceType
- , SourceConfig configuration
+ , Configurations configurations
, List<Tuple> tuples
, List<JSONObject> messages
) throws Exception
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/bolt/ParserBolt.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/bolt/ParserBolt.java
index 27294ef..abdb207 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/bolt/ParserBolt.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/bolt/ParserBolt.java
@@ -38,12 +38,12 @@ public class ParserBolt extends ConfiguredBolt {
private MessageParser<JSONObject> parser;
private MessageFilter<JSONObject> filter = new GenericMessageFilter();
private MessageWriter<JSONObject> writer;
- private String sourceType;
+ private String sensorType;
- public ParserBolt(String zookeeperUrl, String sourceType, MessageParser<JSONObject> parser, MessageWriter<JSONObject> writer) {
+ public ParserBolt(String zookeeperUrl, String sensorType, MessageParser<JSONObject> parser, MessageWriter<JSONObject> writer) {
super(zookeeperUrl);
this.parser = parser;
- this.sourceType = sourceType;
+ this.sensorType = sensorType;
this.writer = writer;
}
@@ -70,8 +70,8 @@ public class ParserBolt extends ConfiguredBolt {
for(JSONObject message: messages) {
if (parser.validate(message)) {
if (filter != null && filter.emitTuple(message)) {
- message.put(Constants.SOURCE_TYPE, sourceType);
- writer.write(sourceType, configurations.get(sourceType), tuple, message);
+ message.put(Constants.SENSOR_TYPE, sensorType);
+ writer.write(sensorType, configurations, tuple, message);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/writer/KafkaWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/writer/KafkaWriter.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/writer/KafkaWriter.java
index 8372e14..ec323d6 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/writer/KafkaWriter.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/writer/KafkaWriter.java
@@ -21,7 +21,7 @@ import backtype.storm.tuple.Tuple;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.metron.Constants;
-import org.apache.metron.domain.SourceConfig;
+import org.apache.metron.domain.Configurations;
import org.apache.metron.writer.interfaces.MessageWriter;
import org.json.simple.JSONObject;
@@ -68,7 +68,7 @@ public class KafkaWriter implements MessageWriter<JSONObject>, Serializable {
@SuppressWarnings("unchecked")
@Override
- public void write(String sourceType, SourceConfig configuration, Tuple tuple, JSONObject message) throws Exception {
+ public void write(String sourceType, Configurations configurations, Tuple tuple, JSONObject message) throws Exception {
kafkaProducer.send(new ProducerRecord<String, String>(Constants.ENRICHMENT_TOPIC, message.toJSONString()));
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Solr/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Solr/pom.xml b/metron-streaming/Metron-Solr/pom.xml
new file mode 100644
index 0000000..cbb7395
--- /dev/null
+++ b/metron-streaming/Metron-Solr/pom.xml
@@ -0,0 +1,204 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software
+ Foundation (ASF) under one or more contributor license agreements. See the
+ NOTICE file distributed with this work for additional information regarding
+ copyright ownership. The ASF licenses this file to You under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>Metron-Streaming</artifactId>
+ <version>0.1BETA</version>
+ </parent>
+ <artifactId>Metron-Solr</artifactId>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>Metron-Common</artifactId>
+ <version>${project.parent.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.solr</groupId>
+ <artifactId>solr-solrj</artifactId>
+ <version>${global_solr_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.solr</groupId>
+ <artifactId>solr-test-framework</artifactId>
+ <version>${global_solr_version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${global_storm_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>Metron-Testing</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>Metron-Topologies</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>${global_mockito_version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <!-- Separates the unit tests from the integration tests. -->
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.12.4</version>
+ <configuration>
+ <!-- Skip the default running of this plug-in (or everything is run twice...see below) -->
+ <argLine>-Xmx2048m -XX:MaxPermSize=256m</argLine>
+ <skip>true</skip>
+ <!-- Show 100% of the lines from the stack trace (doesn't work) -->
+ <trimStackTrace>false</trimStackTrace>
+ </configuration>
+ <executions>
+ <execution>
+ <id>unit-tests</id>
+ <phase>test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <!-- Never skip running the tests when the test phase is invoked -->
+ <skip>false</skip>
+ <includes>
+ <!-- Include unit tests within integration-test phase. -->
+ <include>**/*Test.java</include>
+ </includes>
+ <excludes>
+ <!-- Exclude integration tests within (unit) test phase. -->
+ <exclude>**/*IntegrationTest.java</exclude>
+ </excludes>
+ </configuration>
+ </execution>
+ <execution>
+ <id>integration-tests</id>
+ <phase>integration-test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <!-- Never skip running the tests when the integration-test phase is invoked -->
+ <skip>false</skip>
+ <includes>
+ <!-- Include integration tests within integration-test phase. -->
+ <include>**/*IntegrationTest.java</include>
+ </includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.3</version>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <excludes>
+ <exclude>storm:storm-core:*</exclude>
+ <exclude>storm:storm-lib:*</exclude>
+ <exclude>org.slf4j.impl*</exclude>
+ <exclude>org.slf4j:slf4j-log4j*</exclude>
+ </excludes>
+ </artifactSet>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+ <resource>.yaml</resource>
+ </transformer>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass></mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptor>src/main/assembly/assembly.xml</descriptor>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id> <!-- this is used for inheritance merges -->
+ <phase>package</phase> <!-- bind to the packaging phase -->
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Solr/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Solr/src/main/assembly/assembly.xml b/metron-streaming/Metron-Solr/src/main/assembly/assembly.xml
new file mode 100644
index 0000000..35cbcc3
--- /dev/null
+++ b/metron-streaming/Metron-Solr/src/main/assembly/assembly.xml
@@ -0,0 +1,41 @@
+<!--
+ Licensed to the Apache Software
+ Foundation (ASF) under one or more contributor license agreements. See the
+ NOTICE file distributed with this work for additional information regarding
+ copyright ownership. The ASF licenses this file to You under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License.
+ -->
+
+<assembly>
+ <id>archive</id>
+ <formats>
+ <format>tar.gz</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${project.basedir}/src/main/resources/Metron_Configs/etc</directory>
+ <outputDirectory>/config/etc</outputDirectory>
+ <useDefaultExcludes>true</useDefaultExcludes>
+ <excludes>
+ <exclude>**/*.formatted</exclude>
+ <exclude>**/*.filtered</exclude>
+ </excludes>
+ <fileMode>0644</fileMode>
+ <lineEnding>unix</lineEnding>
+ </fileSet>
+ <fileSet>
+ <directory>${project.basedir}/target</directory>
+ <includes>
+ <include>${project.artifactId}-${project.version}.jar</include>
+ </includes>
+ <outputDirectory>/lib</outputDirectory>
+ <useDefaultExcludes>true</useDefaultExcludes>
+ </fileSet>
+ </fileSets>
+</assembly>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Solr/src/main/java/org/apache/metron/solr/SolrConstants.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Solr/src/main/java/org/apache/metron/solr/SolrConstants.java b/metron-streaming/Metron-Solr/src/main/java/org/apache/metron/solr/SolrConstants.java
new file mode 100644
index 0000000..d5dc7a0
--- /dev/null
+++ b/metron-streaming/Metron-Solr/src/main/java/org/apache/metron/solr/SolrConstants.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.solr;
+
+public class SolrConstants {
+
+ public static final String REQUEST_ACTION = "action";
+ public static final String REQUEST_NAME = "name";
+ public static final String REQUEST_NUM_SHARDS = "numShards";
+ public static final String REQUEST_REPLICATION_FACTOR = "replicationFactor";
+ public static final String REQUEST_COLLECTION_CONFIG_NAME = "collection.configName";
+ public static final String REQUEST_COLLECTIONS_PATH = "/admin/collections";
+ public static final String RESPONSE_COLLECTIONS = "collections";
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Solr/src/main/java/org/apache/metron/writer/solr/MetronSolrClient.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Solr/src/main/java/org/apache/metron/writer/solr/MetronSolrClient.java b/metron-streaming/Metron-Solr/src/main/java/org/apache/metron/writer/solr/MetronSolrClient.java
new file mode 100644
index 0000000..e0485ab
--- /dev/null
+++ b/metron-streaming/Metron-Solr/src/main/java/org/apache/metron/writer/solr/MetronSolrClient.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer.solr;
+
+import org.apache.log4j.Logger;
+import org.apache.metron.solr.SolrConstants;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+
+import java.io.IOException;
+import java.util.List;
+
+public class MetronSolrClient extends CloudSolrClient {
+
+ private static final Logger LOG = Logger.getLogger(MetronSolrClient.class);
+
+
+ public MetronSolrClient(String zkHost) {
+ super(zkHost);
+ }
+
+ public void createCollection(String name, int numShards, int replicationFactor) throws IOException, SolrServerException {
+ if (!listCollections().contains(name)) {
+ request(getCreateCollectionsRequest(name, numShards, replicationFactor));
+ }
+ }
+
+ public QueryRequest getCreateCollectionsRequest(String name, int numShards, int replicationFactor) {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(SolrConstants.REQUEST_ACTION, CollectionParams.CollectionAction.CREATE.name());
+ params.set(SolrConstants.REQUEST_NAME, name);
+ params.set(SolrConstants.REQUEST_NUM_SHARDS, numShards);
+ params.set(SolrConstants.REQUEST_REPLICATION_FACTOR, replicationFactor);
+ params.set(SolrConstants.REQUEST_COLLECTION_CONFIG_NAME, name);
+ QueryRequest request = new QueryRequest(params);
+ request.setPath(SolrConstants.REQUEST_COLLECTIONS_PATH);
+ return request;
+ }
+
+ @SuppressWarnings("unchecked")
+ public List<String> listCollections() throws IOException, SolrServerException {
+ NamedList<Object> response = request(getListCollectionsRequest(), null);
+ return (List<String>) response.get(SolrConstants.RESPONSE_COLLECTIONS);
+ }
+
+ public QueryRequest getListCollectionsRequest() {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(SolrConstants.REQUEST_ACTION, CollectionParams.CollectionAction.LIST.name());
+ QueryRequest request = new QueryRequest(params);
+ request.setPath(SolrConstants.REQUEST_COLLECTIONS_PATH);
+ return request;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Solr/src/main/java/org/apache/metron/writer/solr/SolrWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Solr/src/main/java/org/apache/metron/writer/solr/SolrWriter.java b/metron-streaming/Metron-Solr/src/main/java/org/apache/metron/writer/solr/SolrWriter.java
new file mode 100644
index 0000000..68303ea
--- /dev/null
+++ b/metron-streaming/Metron-Solr/src/main/java/org/apache/metron/writer/solr/SolrWriter.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer.solr;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.log4j.Logger;
+import org.apache.metron.domain.Configurations;
+import org.apache.metron.writer.interfaces.BulkMessageWriter;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.common.SolrInputDocument;
+import org.json.simple.JSONObject;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+public class SolrWriter implements BulkMessageWriter<JSONObject>, Serializable {
+
+ public static final String DEFAULT_COLLECTION = "metron";
+
+ private static final Logger LOG = Logger.getLogger(SolrWriter.class);
+
+ private boolean shouldCommit = false;
+ private MetronSolrClient solr;
+
+ public SolrWriter withShouldCommit(boolean shouldCommit) {
+ this.shouldCommit = shouldCommit;
+ return this;
+ }
+
+ public SolrWriter withMetronSolrClient(MetronSolrClient solr) {
+ this.solr = solr;
+ return this;
+ }
+
+ @Override
+ public void init(Map stormConf, Configurations configurations) throws IOException, SolrServerException {
+ Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
+ if(solr == null) solr = new MetronSolrClient((String) globalConfiguration.get("solr.zookeeper"));
+ String collection = getCollection(configurations);
+ solr.createCollection(collection, (Integer) globalConfiguration.get("solr.numShards"), (Integer) globalConfiguration.get("solr.replicationFactor"));
+ solr.setDefaultCollection(collection);
+ }
+
+ @Override
+ public void write(String sourceType, Configurations configurations, List<Tuple> tuples, List<JSONObject> messages) throws Exception {
+ for(JSONObject message: messages) {
+ SolrInputDocument document = new SolrInputDocument();
+ document.addField("id", getIdValue(message));
+ document.addField("sensorType", sourceType);
+ for(Object key: message.keySet()) {
+ Object value = message.get(key);
+ document.addField(getFieldName(key, value), value);
+ }
+ UpdateResponse response = solr.add(document);
+ }
+ if (shouldCommit) {
+ solr.commit(getCollection(configurations));
+ }
+ }
+
+ protected String getCollection(Configurations configurations) {
+ String collection = (String) configurations.getGlobalConfig().get("solr.collection");
+ return collection != null ? collection : DEFAULT_COLLECTION;
+ }
+
+ private int getIdValue(JSONObject message) {
+ return message.toJSONString().hashCode();
+ }
+
+ protected String getFieldName(Object key, Object value) {
+ String field;
+ if (value instanceof Integer) {
+ field = key + "_i";
+ } else if (value instanceof Long) {
+ field = key + "_l";
+ } else if (value instanceof Float) {
+ field = key + "_f";
+ } else if (value instanceof Double) {
+ field = key + "_d";
+ } else {
+ field = key + "_s";
+ }
+ return field;
+ }
+
+ @Override
+ public void close() throws Exception {
+ solr.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Solr/src/main/resources/Metron_Configs/etc/env/solr.properties
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Solr/src/main/resources/Metron_Configs/etc/env/solr.properties b/metron-streaming/Metron-Solr/src/main/resources/Metron_Configs/etc/env/solr.properties
new file mode 100644
index 0000000..df25506
--- /dev/null
+++ b/metron-streaming/Metron-Solr/src/main/resources/Metron_Configs/etc/env/solr.properties
@@ -0,0 +1,109 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+##### Kafka #####
+
+kafka.zk=node1:2181
+kafka.broker=node1:6667
+spout.kafka.topic.asa=asa
+spout.kafka.topic.bro=bro
+spout.kafka.topic.fireeye=fireeye
+spout.kafka.topic.ise=ise
+spout.kafka.topic.lancope=lancope
+spout.kafka.topic.paloalto=paloalto
+spout.kafka.topic.pcap=pcap
+spout.kafka.topic.snort=snort
+spout.kafka.topic.yaf=yaf
+
+##### Indexing #####
+writer.class.name=org.apache.metron.writer.solr.SolrWriter
+
+##### ElasticSearch #####
+
+es.ip=10.22.0.214
+es.port=9300
+es.clustername=elasticsearch
+
+##### MySQL #####
+
+mysql.ip=10.22.0.214
+mysql.port=3306
+mysql.username=root
+mysql.password=hadoop123
+
+##### Metrics #####
+
+#reporters
+org.apache.metron.metrics.reporter.graphite=true
+org.apache.metron.metrics.reporter.console=false
+org.apache.metron.metrics.reporter.jmx=false
+
+#Graphite Addresses
+
+org.apache.metron.metrics.graphite.address=localhost
+org.apache.metron.metrics.graphite.port=2023
+
+#TelemetryParserBolt
+org.apache.metron.metrics.TelemetryParserBolt.acks=true
+org.apache.metron.metrics.TelemetryParserBolt.emits=true
+org.apache.metron.metrics.TelemetryParserBolt.fails=true
+
+
+#GenericEnrichmentBolt
+org.apache.metron.metrics.GenericEnrichmentBolt.acks=true
+org.apache.metron.metrics.GenericEnrichmentBolt.emits=true
+org.apache.metron.metrics.GenericEnrichmentBolt.fails=true
+
+
+#TelemetryIndexingBolt
+org.apache.metron.metrics.TelemetryIndexingBolt.acks=true
+org.apache.metron.metrics.TelemetryIndexingBolt.emits=true
+org.apache.metron.metrics.TelemetryIndexingBolt.fails=true
+
+##### Host Enrichment #####
+
+org.apache.metron.enrichment.host.known_hosts=[{"ip":"10.1.128.236", "local":"YES", "type":"webserver", "asset_value" : "important"},\
+{"ip":"10.1.128.237", "local":"UNKNOWN", "type":"unknown", "asset_value" : "important"},\
+{"ip":"10.60.10.254", "local":"YES", "type":"printer", "asset_value" : "important"}]
+
+##### HDFS #####
+
+bolt.hdfs.batch.size=5000
+bolt.hdfs.field.delimiter=|
+bolt.hdfs.file.rotation.size.in.mb=5
+bolt.hdfs.file.system.url=hdfs://iot01.cloud.hortonworks.com:8020
+bolt.hdfs.wip.file.path=/paloalto/wip
+bolt.hdfs.finished.file.path=/paloalto/rotated
+bolt.hdfs.compression.codec.class=org.apache.hadoop.io.compress.SnappyCodec
+index.hdfs.output=/tmp/metron/enriched
+
+##### HBase #####
+bolt.hbase.table.name=pcap
+bolt.hbase.table.fields=t:value
+bolt.hbase.table.key.tuple.field.name=key
+bolt.hbase.table.timestamp.tuple.field.name=timestamp
+bolt.hbase.enable.batching=false
+bolt.hbase.write.buffer.size.in.bytes=2000000
+bolt.hbase.durability=SKIP_WAL
+bolt.hbase.partitioner.region.info.refresh.interval.mins=60
+
+##### Threat Intel #####
+
+threat.intel.tracker.table=
+threat.intel.tracker.cf=
+threat.intel.ip.table=
+threat.intel.ip.cf=
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/integration/SolrEnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/integration/SolrEnrichmentIntegrationTest.java b/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/integration/SolrEnrichmentIntegrationTest.java
new file mode 100644
index 0000000..afeb56b
--- /dev/null
+++ b/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/integration/SolrEnrichmentIntegrationTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration;
+
+import com.google.common.base.Function;
+import org.apache.metron.domain.Configurations;
+import org.apache.metron.integration.util.integration.ComponentRunner;
+import org.apache.metron.integration.util.integration.InMemoryComponent;
+import org.apache.metron.integration.util.integration.Processor;
+import org.apache.metron.integration.util.integration.ReadinessState;
+import org.apache.metron.integration.util.integration.components.SolrComponent;
+import org.apache.metron.util.SampleUtil;
+import org.apache.metron.utils.ConfigurationsUtils;
+import org.apache.metron.utils.JSONUtils;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class SolrEnrichmentIntegrationTest extends EnrichmentIntegrationTest {
+
+ private String collection = "metron";
+ private String solrZookeeperUrl;
+
+ @Override
+ public InMemoryComponent getSearchComponent(final Properties topologyProperties) throws Exception {
+ SolrComponent solrComponent = new SolrComponent.Builder()
+ .addCollection(collection, "../Metron-Solr/src/test/resources/solr/conf")
+ .withPostStartCallback(new Function<SolrComponent, Void>() {
+ @Nullable
+ @Override
+ public Void apply(@Nullable SolrComponent solrComponent) {
+ topologyProperties.setProperty("solr.zk", solrComponent.getZookeeperUrl());
+ try {
+ String testZookeeperUrl = topologyProperties.getProperty("kafka.zk");
+ Configurations configurations = SampleUtil.getSampleConfigs();
+ Map<String, Object> globalConfig = configurations.getGlobalConfig();
+ globalConfig.put("solr.zookeeper", solrComponent.getZookeeperUrl());
+ ConfigurationsUtils.writerGlobalConfigToZookeeper(JSONUtils.INSTANCE.toJSON(globalConfig), testZookeeperUrl);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+ })
+ .build();
+ return solrComponent;
+ }
+
+ @Override
+ Processor<List<Map<String, Object>>> getProcessor(final List<byte[]> inputMessages) {
+ return new Processor<List<Map<String, Object>>>() {
+ List<Map<String, Object>> docs = null;
+ public ReadinessState process(ComponentRunner runner) {
+ SolrComponent solrComponent = runner.getComponent("search", SolrComponent.class);
+ if (solrComponent.hasCollection(collection)) {
+ List<Map<String, Object>> docsFromDisk;
+ try {
+ docs = solrComponent.getAllIndexedDocs(collection);
+ docsFromDisk = EnrichmentIntegrationTest.readDocsFromDisk(hdfsDir);
+ System.out.println(docs.size() + " vs " + inputMessages.size() + " vs " + docsFromDisk.size());
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to retrieve indexed documents.", e);
+ }
+ if (docs.size() < inputMessages.size() || docs.size() != docsFromDisk.size()) {
+ return ReadinessState.NOT_READY;
+ } else {
+ return ReadinessState.READY;
+ }
+ } else {
+ return ReadinessState.NOT_READY;
+ }
+ }
+
+ public List<Map<String, Object>> getResult() {
+ return docs;
+ }
+ };
+ }
+
+ @Override
+ void setAdditionalProperties(Properties topologyProperties) {
+ topologyProperties.setProperty("writer.class.name", "org.apache.metron.writer.solr.SolrWriter");
+ }
+
+ @Override
+ public String cleanField(String field) {
+ return field.replaceFirst("_[dfils]$", "");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/integration/util/integration/components/SolrComponent.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/integration/util/integration/components/SolrComponent.java b/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/integration/util/integration/components/SolrComponent.java
new file mode 100644
index 0000000..f2b9748
--- /dev/null
+++ b/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/integration/util/integration/components/SolrComponent.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util.integration.components;
+
+import com.google.common.base.Function;
+import org.apache.metron.integration.util.integration.InMemoryComponent;
+import org.apache.metron.integration.util.integration.UnableToStartException;
+import org.apache.metron.writer.solr.MetronSolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettyConfig;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.common.SolrDocument;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SolrComponent implements InMemoryComponent {
+
+ public static class Builder {
+ private int port = 8983;
+ private String solrXmlPath = "../Metron-Solr/src/test/resources/solr/solr.xml";
+ private Map<String, String> collections = new HashMap<>();
+ private Function<SolrComponent, Void> postStartCallback;
+
+ public Builder withPort(int port) {
+ this.port = port;
+ return this;
+ }
+
+ public Builder withSolrXmlPath(String solrXmlPath) {
+ this.solrXmlPath = solrXmlPath;
+ return this;
+ }
+
+ public Builder addCollection(String name, String configPath) {
+ collections.put(name, configPath);
+ return this;
+ }
+
+ public Builder withPostStartCallback(Function<SolrComponent, Void> f) {
+ postStartCallback = f;
+ return this;
+ }
+
+ public SolrComponent build() throws Exception {
+ if (collections.isEmpty()) throw new Exception("Must add at least 1 collection");
+ return new SolrComponent(port, solrXmlPath, collections, postStartCallback);
+ }
+ }
+
+ private int port;
+ private String solrXmlPath;
+ private Map<String, String> collections;
+ private MiniSolrCloudCluster miniSolrCloudCluster;
+ private Function<SolrComponent, Void> postStartCallback;
+
+ private SolrComponent(int port, String solrXmlPath, Map<String, String> collections, Function<SolrComponent, Void> postStartCallback) throws Exception {
+ this.port = port;
+ this.solrXmlPath = solrXmlPath;
+ this.collections = collections;
+ this.postStartCallback = postStartCallback;
+ }
+
+ @Override
+ public void start() throws UnableToStartException {
+ try {
+ File baseDir = Files.createTempDirectory("solrcomponent").toFile();
+ baseDir.deleteOnExit();
+ miniSolrCloudCluster = new MiniSolrCloudCluster(1, baseDir, new File(solrXmlPath), JettyConfig.builder().setPort(port).build());
+ for(String name: collections.keySet()) {
+ String configPath = collections.get(name);
+ miniSolrCloudCluster.uploadConfigDir(new File(configPath), name);
+ }
+ miniSolrCloudCluster.createCollection("metron", 1, 1, "metron", new HashMap<String, String>());
+ if (postStartCallback != null) postStartCallback.apply(this);
+ } catch(Exception e) {
+ throw new UnableToStartException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ try {
+ miniSolrCloudCluster.shutdown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public MetronSolrClient getSolrClient() {
+ return new MetronSolrClient(getZookeeperUrl());
+ }
+
+ public MiniSolrCloudCluster getMiniSolrCloudCluster() {
+ return this.miniSolrCloudCluster;
+ }
+
+ public String getZookeeperUrl() {
+ return miniSolrCloudCluster.getZkServer().getZkAddress();
+ }
+
+ public boolean hasCollection(String collection) {
+ MetronSolrClient solr = getSolrClient();
+ boolean collectionFound = false;
+ try {
+ collectionFound = solr.listCollections().contains(collection);
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+ return collectionFound;
+ }
+
+ public List<Map<String, Object>> getAllIndexedDocs(String collection) {
+ List<Map<String, Object>> docs = new ArrayList<>();
+ CloudSolrClient solr = miniSolrCloudCluster.getSolrClient();
+ solr.setDefaultCollection(collection);
+ SolrQuery parameters = new SolrQuery();
+ parameters.set("q", "*:*");
+ try {
+ solr.commit();
+ QueryResponse response = solr.query(parameters);
+ for (SolrDocument solrDocument : response.getResults()) {
+ docs.add(solrDocument);
+ }
+ } catch (SolrServerException | IOException e) {
+ e.printStackTrace();
+ }
+ return docs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/writer/solr/MetronSolrClientTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/writer/solr/MetronSolrClientTest.java b/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/writer/solr/MetronSolrClientTest.java
new file mode 100644
index 0000000..7bd3ac6
--- /dev/null
+++ b/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/writer/solr/MetronSolrClientTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer.solr;
+
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.util.NamedList;
+import org.hamcrest.Description;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class MetronSolrClientTest {
+
+ class CollectionRequestMatcher extends ArgumentMatcher<QueryRequest> {
+
+ private String name;
+
+ public CollectionRequestMatcher(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public boolean matches(Object o) {
+ QueryRequest queryRequest = (QueryRequest) o;
+ return name.equals(queryRequest.getParams().get("action"));
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText(name);
+ }
+ }
+
+ @Test
+ public void testClient() throws Exception {
+
+ final String collection = "metron";
+ String zookeeperUrl = "zookeeperUrl";
+ MetronSolrClient metronSolrClient = Mockito.spy(new MetronSolrClient(zookeeperUrl));
+
+ Mockito.doReturn(new NamedList<Object>() {{
+ add("collections", new ArrayList<String>() {{
+ add(collection);
+ }});
+ }}).when(metronSolrClient).request(argThat(new CollectionRequestMatcher(CollectionParams.CollectionAction.LIST.name())), (String) isNull());
+ metronSolrClient.createCollection(collection, 1, 1);
+ verify(metronSolrClient, times(1)).request(argThat(new CollectionRequestMatcher(CollectionParams.CollectionAction.LIST.name())), (String) isNull());
+ verify(metronSolrClient, times(0)).request(argThat(new CollectionRequestMatcher(CollectionParams.CollectionAction.CREATE.name())), (String) isNull());
+
+ metronSolrClient = Mockito.spy(new MetronSolrClient(zookeeperUrl));
+ Mockito.doReturn(new NamedList<Object>() {{
+ add("collections", new ArrayList<String>());
+ }}).when(metronSolrClient).request(argThat(new CollectionRequestMatcher(CollectionParams.CollectionAction.LIST.name())), (String) isNull());
+ Mockito.doReturn(new NamedList<>()).when(metronSolrClient).request(argThat(new CollectionRequestMatcher(CollectionParams.CollectionAction.CREATE.name())), (String) isNull());
+ metronSolrClient.createCollection(collection, 1, 1);
+ verify(metronSolrClient, times(1)).request(argThat(new CollectionRequestMatcher(CollectionParams.CollectionAction.LIST.name())), (String) isNull());
+ verify(metronSolrClient, times(1)).request(argThat(new CollectionRequestMatcher(CollectionParams.CollectionAction.CREATE.name())), (String) isNull());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/writer/solr/SolrWriterTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/writer/solr/SolrWriterTest.java b/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/writer/solr/SolrWriterTest.java
new file mode 100644
index 0000000..7c720ea
--- /dev/null
+++ b/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/writer/solr/SolrWriterTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer.solr;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.domain.Configurations;
+import org.apache.metron.util.SampleUtil;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.SolrInputDocument;
+import org.hamcrest.Description;
+import org.json.simple.JSONObject;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+
+public class SolrWriterTest {
+
+ class CollectionRequestMatcher extends ArgumentMatcher<QueryRequest> {
+
+ private String name;
+
+ public CollectionRequestMatcher(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public boolean matches(Object o) {
+ QueryRequest queryRequest = (QueryRequest) o;
+ return name.equals(queryRequest.getParams().get("action"));
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText(name);
+ }
+ }
+
+ class SolrInputDocumentMatcher extends ArgumentMatcher<SolrInputDocument> {
+
+ private int expectedId;
+ private String expectedSourceType;
+ private int expectedInt;
+ private double expectedDouble;
+
+ public SolrInputDocumentMatcher(int expectedId, String expectedSourceType, int expectedInt, double expectedDouble) {
+ this.expectedId = expectedId;
+ this.expectedSourceType = expectedSourceType;
+ this.expectedInt = expectedInt;
+ this.expectedDouble = expectedDouble;
+ }
+
+ @Override
+ public boolean matches(Object o) {
+ SolrInputDocument solrInputDocument = (SolrInputDocument) o;
+ int actualId = (Integer) solrInputDocument.get("id").getValue();
+ String actualName = (String) solrInputDocument.get("sensorType").getValue();
+ int actualInt = (Integer) solrInputDocument.get("intField_i").getValue();
+ double actualDouble = (Double) solrInputDocument.get("doubleField_d").getValue();
+ return expectedId == actualId && expectedSourceType.equals(actualName) && expectedInt == actualInt && expectedDouble == actualDouble;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText(String.format("fields: [id=%d, doubleField_d=%f, name=%s, intField_i=%d]", expectedId, expectedDouble, expectedSourceType, expectedInt));
+ }
+
+ }
+
+ @Test
+ public void testWriter() throws Exception {
+ Configurations configurations = SampleUtil.getSampleConfigs();
+ JSONObject message1 = new JSONObject();
+ message1.put("intField", 100);
+ message1.put("doubleField", 100.0);
+ JSONObject message2 = new JSONObject();
+ message2.put("intField", 200);
+ message2.put("doubleField", 200.0);
+ List<JSONObject> messages = new ArrayList<>();
+ messages.add(message1);
+ messages.add(message2);
+
+ String collection = "metron";
+ MetronSolrClient solr = Mockito.mock(MetronSolrClient.class);
+ SolrWriter writer = new SolrWriter().withMetronSolrClient(solr);
+ writer.init(null, configurations);
+ verify(solr, times(1)).createCollection(collection, 1, 1);
+ verify(solr, times(1)).setDefaultCollection(collection);
+
+ collection = "metron2";
+ int numShards = 4;
+ int replicationFactor = 2;
+ Map<String, Object> globalConfig = configurations.getGlobalConfig();
+ globalConfig.put("solr.collection", collection);
+ globalConfig.put("solr.numShards", numShards);
+ globalConfig.put("solr.replicationFactor", replicationFactor);
+ configurations.updateGlobalConfig(globalConfig);
+ writer = new SolrWriter().withMetronSolrClient(solr);
+ writer.init(null, configurations);
+ verify(solr, times(1)).createCollection(collection, numShards, replicationFactor);
+ verify(solr, times(1)).setDefaultCollection(collection);
+
+ writer.write("test", configurations, new ArrayList<Tuple>(), messages);
+ verify(solr, times(1)).add(argThat(new SolrInputDocumentMatcher(message1.toJSONString().hashCode(), "test", 100, 100.0)));
+ verify(solr, times(1)).add(argThat(new SolrInputDocumentMatcher(message2.toJSONString().hashCode(), "test", 200, 200.0)));
+ verify(solr, times(0)).commit(collection);
+
+ writer = new SolrWriter().withMetronSolrClient(solr).withShouldCommit(true);
+ writer.init(null, configurations);
+ writer.write("test", configurations, new ArrayList<Tuple>(), messages);
+ verify(solr, times(2)).add(argThat(new SolrInputDocumentMatcher(message1.toJSONString().hashCode(), "test", 100, 100.0)));
+ verify(solr, times(2)).add(argThat(new SolrInputDocumentMatcher(message2.toJSONString().hashCode(), "test", 200, 200.0)));
+ verify(solr, times(1)).commit(collection);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Solr/src/test/resources/solr/conf/_rest_managed.json
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Solr/src/test/resources/solr/conf/_rest_managed.json b/metron-streaming/Metron-Solr/src/test/resources/solr/conf/_rest_managed.json
new file mode 100644
index 0000000..6a4aec3
--- /dev/null
+++ b/metron-streaming/Metron-Solr/src/test/resources/solr/conf/_rest_managed.json
@@ -0,0 +1 @@
+{"initArgs":{},"managedList":[]}