You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/06/13 18:11:34 UTC
incubator-metron git commit: METRON-217 The elasticsearch writer
should allow multiple ES hosts to be passed in via a List. This should be
backwards compatible. METRON-217 The parser topology numSpoutTasks and
numParserTasks are swapped METRON-217 The gr
Repository: incubator-metron
Updated Branches:
refs/heads/master 6e22dd86a -> ee0eb5bbb
METRON-217 The elasticsearch writer should allow multiple ES hosts to be passed in via a List. This should be backwards compatible.
METRON-217 The parser topology numSpoutTasks and numParserTasks are swapped
METRON-217 The grok parser should throw an exception if it cannot parse so the message that could not be parsed is sent to the error queue.
This closes apache/incubator-metron#149
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/ee0eb5bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/ee0eb5bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/ee0eb5bb
Branch: refs/heads/master
Commit: ee0eb5bbbdf7d03180fba09f7fee3fee5d4299bb
Parents: 6e22dd8
Author: cstella <ce...@gmail.com>
Authored: Mon Jun 13 14:09:32 2016 -0400
Committer: cstella <ce...@gmail.com>
Committed: Mon Jun 13 14:09:32 2016 -0400
----------------------------------------------------------------------
.../apache/metron/common/utils/ErrorUtils.java | 24 +++++++
.../common/writer/BulkWriterComponent.java | 4 +-
.../writer/ElasticsearchWriter.java | 67 +++++++++++++++++---
.../enrichment/bolt/BulkMessageWriterBolt.java | 22 ++++++-
.../enrichment/bolt/EnrichmentJoinBolt.java | 2 +-
.../enrichment/bolt/EnrichmentSplitterBolt.java | 2 +-
.../metron/enrichment/bolt/SplitBolt.java | 2 +-
.../org/apache/metron/parsers/GrokParser.java | 5 +-
.../parsers/topology/ParserTopologyBuilder.java | 4 +-
.../websphere/GrokWebSphereParserTest.java | 3 +-
10 files changed, 112 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ee0eb5bb/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
index 6e139c8..a914e15 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
@@ -17,6 +17,9 @@
*/
package org.apache.metron.common.utils;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
import java.net.InetAddress;
import java.net.UnknownHostException;
@@ -61,4 +64,25 @@ public class ErrorUtils {
collector.emit(errorStream, new Values(error));
collector.reportError(t);
}
+
+ public static String generateThreadDump() {
+ final StringBuilder dump = new StringBuilder();
+ final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+ final ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100);
+ for (ThreadInfo threadInfo : threadInfos) {
+ dump.append('"');
+ dump.append(threadInfo.getThreadName());
+ dump.append("\" ");
+ final Thread.State state = threadInfo.getThreadState();
+ dump.append("\n java.lang.Thread.State: ");
+ dump.append(state);
+ final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace();
+ for (final StackTraceElement stackTraceElement : stackTraceElements) {
+ dump.append("\n at ");
+ dump.append(stackTraceElement);
+ }
+ dump.append("\n\n");
+ }
+ return dump.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ee0eb5bb/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
index ad437d1..7d66ce1 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
@@ -58,7 +58,7 @@ public class BulkWriterComponent<MESSAGE_T> {
}
}
- public void error(Exception e, Iterable<Tuple> tuples) {
+ public void error(Throwable e, Iterable<Tuple> tuples) {
tuples.forEach(t -> collector.ack(t));
LOG.error("Failing " + Iterables.size(tuples) + " tuples", e);
ErrorUtils.handleError(collector, e, Constants.ERROR_STREAM);
@@ -98,7 +98,7 @@ public class BulkWriterComponent<MESSAGE_T> {
commit(tupleList);
}
- } catch (Exception e) {
+ } catch (Throwable e) {
if(handleError) {
error(e, tupleList);
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ee0eb5bb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
index c982d29..8b9fd60 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
@@ -18,6 +18,10 @@
package org.apache.metron.elasticsearch.writer;
import backtype.storm.tuple.Tuple;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.metron.common.configuration.EnrichmentConfigurations;
import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
@@ -37,9 +41,7 @@ import java.io.Serializable;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Serializable {
@@ -72,11 +74,12 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
Settings settings = settingsBuilder.build();
try{
-
- client = TransportClient.builder().settings(settings).build()
- .addTransportAddress(
- new InetSocketTransportAddress(InetAddress.getByName(globalConfiguration.get("es.ip").toString()), Integer.parseInt(globalConfiguration.get("es.port").toString()) )
- );
+ for(HostnamePort hp : getIps(globalConfiguration)) {
+ client = TransportClient.builder().settings(settings).build()
+ .addTransportAddress(
+ new InetSocketTransportAddress(InetAddress.getByName(hp.hostname), hp.port)
+ );
+ }
} catch (UnknownHostException exception){
@@ -88,6 +91,47 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
}
+ public static class HostnamePort {
+ String hostname;
+ Integer port;
+ public HostnamePort(String hostname, Integer port) {
+ this.hostname = hostname;
+ this.port = port;
+ }
+ }
+ List<HostnamePort> getIps(Map<String, Object> globalConfiguration) {
+ Object ipObj = globalConfiguration.get("es.ip");
+ Object portObj = globalConfiguration.get("es.port");
+ if(ipObj == null) {
+ return Collections.emptyList();
+ }
+ if(ipObj instanceof String
+ && !ipObj.toString().contains(":")
+ ) {
+ return ImmutableList.of(new HostnamePort(ipObj.toString(), Integer.parseInt(portObj + "")));
+ }
+ else if(ipObj instanceof String
+ && ipObj.toString().contains(":")
+ ) {
+ Iterable<String> tokens = Splitter.on(":").split(ipObj.toString());
+ String host = Iterables.getFirst(tokens, null);
+ String portStr = Iterables.getLast(tokens, null);
+ return ImmutableList.of(new HostnamePort(host, Integer.parseInt(portStr)));
+ }
+ else if(ipObj instanceof List) {
+ List<String> ips = (List)ipObj;
+ List<HostnamePort> ret = new ArrayList<>();
+ for(String ip : ips) {
+ Iterable<String> tokens = Splitter.on(":").split(ip);
+ String host = Iterables.getFirst(tokens, null);
+ String portStr = Iterables.getLast(tokens, null);
+ ret.add(new HostnamePort(host, Integer.parseInt(portStr)));
+ }
+ return ret;
+ }
+ throw new IllegalStateException("Unable to read the elasticsearch ips, expected es.ip to be either a list of strings, a string hostname or a host:port string");
+ }
+
@Override
public void write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception {
String indexPostfix = dateFormat.format(new Date());
@@ -112,7 +156,12 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
IndexRequestBuilder indexRequestBuilder = client.prepareIndex(indexName,
sensorType + "_doc");
- indexRequestBuilder.setSource(esDoc.toJSONString()).setTimestamp(esDoc.get("timestamp").toString());
+
+ indexRequestBuilder = indexRequestBuilder.setSource(esDoc.toJSONString());
+ Object ts = esDoc.get("timestamp");
+ if(ts != null) {
+ indexRequestBuilder = indexRequestBuilder.setTimestamp(ts.toString());
+ }
bulkRequest.add(indexRequestBuilder);
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ee0eb5bb/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java
index 1d49807..3a063d3 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java
@@ -25,6 +25,7 @@ import backtype.storm.tuple.Tuple;
import org.apache.metron.common.Constants;
import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
import org.apache.metron.common.configuration.writer.EnrichmentWriterConfiguration;
+import org.apache.metron.common.utils.ErrorUtils;
import org.apache.metron.common.utils.MessageUtils;
import org.apache.metron.common.interfaces.BulkMessageWriter;
import org.apache.metron.common.writer.BulkWriterComponent;
@@ -60,18 +61,33 @@ public class BulkMessageWriterBolt extends ConfiguredEnrichmentBolt {
}
}
+ private JSONObject cloneMessage(Tuple tuple) {
+ JSONObject ret = new JSONObject();
+ JSONObject message = (JSONObject) tuple.getValueByField("message");
+ try {
+ for (Iterator<Map.Entry<String, Object>> it = message.entrySet().iterator(); it.hasNext(); ) {
+ Map.Entry<String, Object> kv = it.next();
+ ret.put(kv.getKey(), kv.getValue());
+ }
+ }
+ catch(ConcurrentModificationException cme) {
+ LOG.error(cme.getMessage() + "\n" + ErrorUtils.generateThreadDump(), cme);
+ throw cme;
+ }
+ return ret;
+ }
+
@SuppressWarnings("unchecked")
@Override
public void execute(Tuple tuple) {
- JSONObject message = (JSONObject)((JSONObject) tuple.getValueByField("message")).clone();
- message.put("index." + bulkMessageWriter.getClass().getSimpleName().toLowerCase() + ".ts", "" + System.currentTimeMillis());
+ JSONObject message = cloneMessage(tuple);
String sensorType = MessageUtils.getSensorType(message);
try
{
writerComponent.write(sensorType, tuple, message, bulkMessageWriter, new EnrichmentWriterConfiguration(getConfigurations()));
}
catch(Exception e) {
- throw new RuntimeException("This should have been caught in the writerComponent. If you see this, file a JIRA");
+ throw new RuntimeException("This should have been caught in the writerComponent. If you see this, file a JIRA", e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ee0eb5bb/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
index 7d05c00..9bb91af 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
@@ -81,7 +81,7 @@ public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
message.remove(o);
}
message.put(getClass().getSimpleName().toLowerCase() + ".joiner.ts", "" + System.currentTimeMillis());
- return message;
+ return (JSONObject) message.clone();
}
public Map<String, List<String>> getFieldMap(String sourceType) {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ee0eb5bb/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
index 4b5c7bb..b599ea9 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
@@ -90,7 +90,7 @@ public class EnrichmentSplitterBolt extends SplitBolt<JSONObject> {
message = (JSONObject) tuple.getValueByField(messageFieldName);
message.put(getClass().getSimpleName().toLowerCase() + ".splitter.begin.ts", "" + System.currentTimeMillis());
}
- return message;
+ return (JSONObject)message.clone();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ee0eb5bb/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
index 3063b7f..7739edc 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
@@ -29,7 +29,7 @@ import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
import java.util.Map;
import java.util.Set;
-public abstract class SplitBolt<T> extends
+public abstract class SplitBolt<T extends Cloneable> extends
ConfiguredEnrichmentBolt {
protected OutputCollector collector;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ee0eb5bb/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
index 0379080..da72da8 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
@@ -135,8 +135,9 @@ public class GrokParser implements MessageParser<JSONObject>, Serializable {
init();
}
List<JSONObject> messages = new ArrayList<>();
+ String originalMessage = null;
try {
- String originalMessage = new String(rawMessage, "UTF-8");
+ originalMessage = new String(rawMessage, "UTF-8");
if (LOG.isDebugEnabled()) {
LOG.debug("Grok perser parsing message: " + originalMessage);
}
@@ -168,7 +169,7 @@ public class GrokParser implements MessageParser<JSONObject>, Serializable {
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
- return null;
+ throw new RuntimeException("Grok parser Error: " + e.getMessage() + " on " + originalMessage , e);
}
return messages;
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ee0eb5bb/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index ad6068f..4b357be 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -64,7 +64,7 @@ public class ParserTopologyBuilder {
SpoutConfig spoutConfig = new SpoutConfig(zkHosts, sensorTopic, "", sensorTopic).from(offset);
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
builder.setSpout("kafkaSpout", kafkaSpout, spoutParallelism)
- .setNumTasks(parserNumTasks);
+ .setNumTasks(spoutNumTasks);
MessageParser<JSONObject> parser = ReflectionUtils.createInstance(sensorParserConfig.getParserClassName());
parser.configure(sensorParserConfig.getParserConfig());
ParserBolt parserBolt = null;
@@ -89,7 +89,7 @@ public class ParserTopologyBuilder {
}
}
builder.setBolt("parserBolt", parserBolt, parserParallelism)
- .setNumTasks(spoutNumTasks)
+ .setNumTasks(parserNumTasks)
.shuffleGrouping("kafkaSpout");
return builder;
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ee0eb5bb/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java
index a4789cd..87afe10 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java
@@ -225,7 +225,7 @@ public class GrokWebSphereParserTest {
}
- @Test
+ @Test(expected=RuntimeException.class)
public void testParseEmptyLine() throws Exception {
//Set up parser, attempt to parse malformed message
@@ -233,7 +233,6 @@ public class GrokWebSphereParserTest {
parser.configure(parserConfig);
String testString = "";
List<JSONObject> result = parser.parse(testString.getBytes());
- assertEquals(null, result);
}
}