You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/06/13 18:11:34 UTC

incubator-metron git commit: METRON-217 The elasticsearch writer should allow multiple ES hosts to be passed in via a List. This should be backwards compatible. METRON-217 The parser topology numSpoutTasks and numParserTasks are swapped METRON-217 The gr

Repository: incubator-metron
Updated Branches:
  refs/heads/master 6e22dd86a -> ee0eb5bbb


METRON-217 The elasticsearch writer should allow multiple ES hosts to be passed in via a List. This should be backwards compatible.
METRON-217 The parser topology numSpoutTasks and numParserTasks are swapped
METRON-217 The grok parser should throw an exception if it cannot parse so the message that could not be parsed is sent to the error queue.
This closes apache/incubator-metron#149


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/ee0eb5bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/ee0eb5bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/ee0eb5bb

Branch: refs/heads/master
Commit: ee0eb5bbbdf7d03180fba09f7fee3fee5d4299bb
Parents: 6e22dd8
Author: cstella <ce...@gmail.com>
Authored: Mon Jun 13 14:09:32 2016 -0400
Committer: cstella <ce...@gmail.com>
Committed: Mon Jun 13 14:09:32 2016 -0400

----------------------------------------------------------------------
 .../apache/metron/common/utils/ErrorUtils.java  | 24 +++++++
 .../common/writer/BulkWriterComponent.java      |  4 +-
 .../writer/ElasticsearchWriter.java             | 67 +++++++++++++++++---
 .../enrichment/bolt/BulkMessageWriterBolt.java  | 22 ++++++-
 .../enrichment/bolt/EnrichmentJoinBolt.java     |  2 +-
 .../enrichment/bolt/EnrichmentSplitterBolt.java |  2 +-
 .../metron/enrichment/bolt/SplitBolt.java       |  2 +-
 .../org/apache/metron/parsers/GrokParser.java   |  5 +-
 .../parsers/topology/ParserTopologyBuilder.java |  4 +-
 .../websphere/GrokWebSphereParserTest.java      |  3 +-
 10 files changed, 112 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ee0eb5bb/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
index 6e139c8..a914e15 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
@@ -17,6 +17,9 @@
  */
 package org.apache.metron.common.utils;
 
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 
@@ -61,4 +64,25 @@ public class ErrorUtils {
 		collector.emit(errorStream, new Values(error));
 		collector.reportError(t);
 	}
+
+	public static String generateThreadDump() {
+		final StringBuilder dump = new StringBuilder();
+		final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+		final ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100);
+		for (ThreadInfo threadInfo : threadInfos) {
+			dump.append('"');
+			dump.append(threadInfo.getThreadName());
+			dump.append("\" ");
+			final Thread.State state = threadInfo.getThreadState();
+			dump.append("\n   java.lang.Thread.State: ");
+			dump.append(state);
+			final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace();
+			for (final StackTraceElement stackTraceElement : stackTraceElements) {
+				dump.append("\n        at ");
+				dump.append(stackTraceElement);
+			}
+			dump.append("\n\n");
+		}
+		return dump.toString();
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ee0eb5bb/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
index ad437d1..7d66ce1 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
@@ -58,7 +58,7 @@ public class BulkWriterComponent<MESSAGE_T> {
     }
   }
 
-  public void error(Exception e, Iterable<Tuple> tuples) {
+  public void error(Throwable e, Iterable<Tuple> tuples) {
     tuples.forEach(t -> collector.ack(t));
     LOG.error("Failing " + Iterables.size(tuples) + " tuples", e);
     ErrorUtils.handleError(collector, e, Constants.ERROR_STREAM);
@@ -98,7 +98,7 @@ public class BulkWriterComponent<MESSAGE_T> {
           commit(tupleList);
         }
 
-      } catch (Exception e) {
+      } catch (Throwable e) {
         if(handleError) {
           error(e, tupleList);
         }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ee0eb5bb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
index c982d29..8b9fd60 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
@@ -18,6 +18,10 @@
 package org.apache.metron.elasticsearch.writer;
 
 import backtype.storm.tuple.Tuple;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.metron.common.configuration.EnrichmentConfigurations;
 import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
@@ -37,9 +41,7 @@ import java.io.Serializable;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Serializable {
 
@@ -72,11 +74,12 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
     Settings settings = settingsBuilder.build();
 
     try{
-
-      client = TransportClient.builder().settings(settings).build()
-              .addTransportAddress(
-                      new InetSocketTransportAddress(InetAddress.getByName(globalConfiguration.get("es.ip").toString()), Integer.parseInt(globalConfiguration.get("es.port").toString()) )
-              );
+      for(HostnamePort hp : getIps(globalConfiguration)) {
+        client = TransportClient.builder().settings(settings).build()
+                .addTransportAddress(
+                        new InetSocketTransportAddress(InetAddress.getByName(hp.hostname), hp.port)
+                );
+      }
 
 
     } catch (UnknownHostException exception){
@@ -88,6 +91,47 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
 
   }
 
+  public static class HostnamePort {
+    String hostname;
+    Integer port;
+    public HostnamePort(String hostname, Integer port) {
+      this.hostname = hostname;
+      this.port = port;
+    }
+  }
+  List<HostnamePort> getIps(Map<String, Object> globalConfiguration) {
+    Object ipObj = globalConfiguration.get("es.ip");
+    Object portObj = globalConfiguration.get("es.port");
+    if(ipObj == null) {
+      return Collections.emptyList();
+    }
+    if(ipObj instanceof String
+    && !ipObj.toString().contains(":")
+      ) {
+      return ImmutableList.of(new HostnamePort(ipObj.toString(), Integer.parseInt(portObj + "")));
+    }
+    else if(ipObj instanceof String
+        && ipObj.toString().contains(":")
+           ) {
+      Iterable<String> tokens = Splitter.on(":").split(ipObj.toString());
+      String host = Iterables.getFirst(tokens, null);
+      String portStr = Iterables.getLast(tokens, null);
+      return ImmutableList.of(new HostnamePort(host, Integer.parseInt(portStr)));
+    }
+    else if(ipObj instanceof List) {
+      List<String> ips = (List)ipObj;
+      List<HostnamePort> ret = new ArrayList<>();
+      for(String ip : ips) {
+        Iterable<String> tokens = Splitter.on(":").split(ip);
+        String host = Iterables.getFirst(tokens, null);
+        String portStr = Iterables.getLast(tokens, null);
+        ret.add(new HostnamePort(host, Integer.parseInt(portStr)));
+      }
+      return ret;
+    }
+    throw new IllegalStateException("Unable to read the elasticsearch ips, expected es.ip to be either a list of strings, a string hostname or a host:port string");
+  }
+
   @Override
   public void write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception {
     String indexPostfix = dateFormat.format(new Date());
@@ -112,7 +156,12 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
 
       IndexRequestBuilder indexRequestBuilder = client.prepareIndex(indexName,
               sensorType + "_doc");
-      indexRequestBuilder.setSource(esDoc.toJSONString()).setTimestamp(esDoc.get("timestamp").toString());
+
+      indexRequestBuilder = indexRequestBuilder.setSource(esDoc.toJSONString());
+      Object ts = esDoc.get("timestamp");
+      if(ts != null) {
+        indexRequestBuilder = indexRequestBuilder.setTimestamp(ts.toString());
+      }
       bulkRequest.add(indexRequestBuilder);
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ee0eb5bb/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java
index 1d49807..3a063d3 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java
@@ -25,6 +25,7 @@ import backtype.storm.tuple.Tuple;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
 import org.apache.metron.common.configuration.writer.EnrichmentWriterConfiguration;
+import org.apache.metron.common.utils.ErrorUtils;
 import org.apache.metron.common.utils.MessageUtils;
 import org.apache.metron.common.interfaces.BulkMessageWriter;
 import org.apache.metron.common.writer.BulkWriterComponent;
@@ -60,18 +61,33 @@ public class BulkMessageWriterBolt extends ConfiguredEnrichmentBolt {
     }
   }
 
+  private JSONObject cloneMessage(Tuple tuple) {
+    JSONObject ret = new JSONObject();
+    JSONObject message = (JSONObject) tuple.getValueByField("message");
+    try {
+      for (Iterator<Map.Entry<String, Object>> it = message.entrySet().iterator(); it.hasNext(); ) {
+        Map.Entry<String, Object> kv = it.next();
+        ret.put(kv.getKey(), kv.getValue());
+      }
+    }
+    catch(ConcurrentModificationException cme) {
+      LOG.error(cme.getMessage() + "\n" + ErrorUtils.generateThreadDump(), cme);
+      throw cme;
+    }
+    return ret;
+  }
+
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
-    JSONObject message = (JSONObject)((JSONObject) tuple.getValueByField("message")).clone();
-    message.put("index." + bulkMessageWriter.getClass().getSimpleName().toLowerCase() + ".ts", "" + System.currentTimeMillis());
+    JSONObject message = cloneMessage(tuple);
     String sensorType = MessageUtils.getSensorType(message);
     try
     {
       writerComponent.write(sensorType, tuple, message, bulkMessageWriter, new EnrichmentWriterConfiguration(getConfigurations()));
     }
     catch(Exception e) {
-      throw new RuntimeException("This should have been caught in the writerComponent.  If you see this, file a JIRA");
+      throw new RuntimeException("This should have been caught in the writerComponent.  If you see this, file a JIRA", e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ee0eb5bb/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
index 7d05c00..9bb91af 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
@@ -81,7 +81,7 @@ public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
       message.remove(o);
     }
     message.put(getClass().getSimpleName().toLowerCase() + ".joiner.ts", "" + System.currentTimeMillis());
-    return message;
+    return (JSONObject) message.clone();
   }
 
   public Map<String, List<String>> getFieldMap(String sourceType) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ee0eb5bb/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
index 4b5c7bb..b599ea9 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
@@ -90,7 +90,7 @@ public class EnrichmentSplitterBolt extends SplitBolt<JSONObject> {
             message = (JSONObject) tuple.getValueByField(messageFieldName);
             message.put(getClass().getSimpleName().toLowerCase() + ".splitter.begin.ts", "" + System.currentTimeMillis());
         }
-        return message;
+        return (JSONObject)message.clone();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ee0eb5bb/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
index 3063b7f..7739edc 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
@@ -29,7 +29,7 @@ import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
 import java.util.Map;
 import java.util.Set;
 
-public abstract class SplitBolt<T> extends
+public abstract class SplitBolt<T extends Cloneable> extends
         ConfiguredEnrichmentBolt {
 
   protected OutputCollector collector;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ee0eb5bb/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
index 0379080..da72da8 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
@@ -135,8 +135,9 @@ public class GrokParser implements MessageParser<JSONObject>, Serializable {
       init();
     }
     List<JSONObject> messages = new ArrayList<>();
+    String originalMessage = null;
     try {
-      String originalMessage = new String(rawMessage, "UTF-8");
+      originalMessage = new String(rawMessage, "UTF-8");
       if (LOG.isDebugEnabled()) {
         LOG.debug("Grok perser parsing message: " + originalMessage);
       }
@@ -168,7 +169,7 @@ public class GrokParser implements MessageParser<JSONObject>, Serializable {
       }
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
-      return null;
+      throw new RuntimeException("Grok parser Error: " + e.getMessage() + " on " + originalMessage , e);
     }
     return messages;
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ee0eb5bb/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index ad6068f..4b357be 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -64,7 +64,7 @@ public class ParserTopologyBuilder {
     SpoutConfig spoutConfig = new SpoutConfig(zkHosts, sensorTopic, "", sensorTopic).from(offset);
     KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
     builder.setSpout("kafkaSpout", kafkaSpout, spoutParallelism)
-           .setNumTasks(parserNumTasks);
+           .setNumTasks(spoutNumTasks);
     MessageParser<JSONObject> parser = ReflectionUtils.createInstance(sensorParserConfig.getParserClassName());
     parser.configure(sensorParserConfig.getParserConfig());
     ParserBolt parserBolt = null;
@@ -89,7 +89,7 @@ public class ParserTopologyBuilder {
       }
     }
     builder.setBolt("parserBolt", parserBolt, parserParallelism)
-           .setNumTasks(spoutNumTasks)
+           .setNumTasks(parserNumTasks)
            .shuffleGrouping("kafkaSpout");
     return builder;
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ee0eb5bb/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java
index a4789cd..87afe10 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java
@@ -225,7 +225,7 @@ public class GrokWebSphereParserTest {
 	}
 	
 	
-	@Test
+	@Test(expected=RuntimeException.class)
 	public void testParseEmptyLine() throws Exception {
 		
 		//Set up parser, attempt to parse malformed message
@@ -233,7 +233,6 @@ public class GrokWebSphereParserTest {
 		parser.configure(parserConfig);
 		String testString = "";
 		List<JSONObject> result = parser.parse(testString.getBytes());		
-		assertEquals(null, result);
 	}
 		
 }