You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@metron.apache.org by nickwallen <gi...@git.apache.org> on 2018/03/05 23:39:39 UTC
[GitHub] metron pull request #940: METRON-1460: Create a complementary non-split-join...
Github user nickwallen commented on a diff in the pull request:
https://github.com/apache/metron/pull/940#discussion_r172353404
--- Diff: metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/UnifiedEnrichmentBolt.java ---
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.bolt;
+
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
+import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.error.MetronError;
+import org.apache.metron.common.performance.PerformanceLogger;
+import org.apache.metron.common.utils.ErrorUtils;
+import org.apache.metron.common.utils.MessageUtils;
+import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase;
+import org.apache.metron.enrichment.configuration.Enrichment;
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.apache.metron.enrichment.parallel.EnrichmentContext;
+import org.apache.metron.enrichment.parallel.EnrichmentStrategies;
+import org.apache.metron.enrichment.parallel.ParallelEnricher;
+import org.apache.metron.enrichment.parallel.WorkerPoolStrategy;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.StellarFunction;
+import org.apache.metron.stellar.dsl.StellarFunctions;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This bolt is a unified enrichment/threat intel bolt. In contrast to the split/enrich/join
+ * bolts above, this handles the entire enrichment lifecycle in one bolt using a threadpool to
+ * enrich in parallel.
+ *
+ * From an architectural perspective, this is a divergence from the polymorphism based strategy we have
+ * used in the split/join bolts. Rather, this bolt is provided a strategy to use, either enrichment or threat intel,
+ * through composition. This allows us to move most of the implementation into components independent
+ * from Storm. This will greater facilitate reuse.
+ */
+public class UnifiedEnrichmentBolt extends ConfiguredEnrichmentBolt {
+
+ public static class Perf {} // used for performance logging
+ private PerformanceLogger perfLog; // not static bc multiple bolts may exist in same worker
+
+ protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static final String STELLAR_CONTEXT_CONF = "stellarContext";
+
+ /**
+ * The number of threads in the threadpool. One threadpool is created per process.
+ * This is a topology-level configuration
+ */
+ public static final String THREADPOOL_NUM_THREADS_TOPOLOGY_CONF = "metron.threadpool.size";
+ /**
+ * The type of threadpool to create. This is a topology-level configuration.
+ */
+ public static final String THREADPOOL_TYPE_TOPOLOGY_CONF = "metron.threadpool.type";
+
+ /**
+ * The enricher implementation to use. This will do the parallel enrichment via a thread pool.
+ */
+ protected ParallelEnricher enricher;
+
+ /**
+ * The strategy to use for this enrichment bolt. Practically speaking this is either
+ * enrichment or threat intel. It is configured in the topology itself.
+ */
+ protected EnrichmentStrategies strategy;
+ private JSONParser parser;
+ protected OutputCollector collector;
+ private Context stellarContext;
+ /**
+ * An enrichment type to adapter map. This is configured externally.
+ */
+ protected Map<String, EnrichmentAdapter<CacheKey>> enrichmentsByType = new HashMap<>();
+
+ /**
+ * The total number of elements in a LRU cache. This cache is used for the enrichments; if an
+ * element is in the cache, then the result is returned instead of computed.
+ */
+ protected Long maxCacheSize;
+ /**
+ * The total amount of time in minutes since write to keep an element in the cache.
+ */
+ protected Long maxTimeRetain;
+ /**
+ * If the bolt is reloaded, invalidate the cache?
+ */
+ protected boolean invalidateCacheOnReload = false;
+
+ /**
+ * The message field to use. If this is set, then this indicates the field to use to retrieve the message object.
+ * IF this is unset, then we presume that the message is coming in as a string version of a JSON blob on the first
+ * element of the tuple.
+ */
+ protected String messageFieldName;
+ protected EnrichmentContext enrichmentContext;
+ protected boolean captureCacheStats = true;
+
+ public UnifiedEnrichmentBolt(String zookeeperUrl) {
+ super(zookeeperUrl);
+ }
+
+ /**
+ * Specify the enrichments to support.
+ * @param enrichments enrichment
+ * @return Instance of this class
+ */
+ public UnifiedEnrichmentBolt withEnrichments(List<Enrichment> enrichments) {
+ for(Enrichment e : enrichments) {
+ enrichmentsByType.put(e.getType(), e.getAdapter());
+ }
+ return this;
+ }
+
+ public UnifiedEnrichmentBolt withCaptureCacheStats(boolean captureCacheStats) {
+ this.captureCacheStats = captureCacheStats;
+ return this;
+ }
+
+ /**
+ * Figure out how many threads to use in the thread pool. The user can pass an arbitrary object, so parse it
+ * according to some rules. If it's a number, then cast to an int. IF it's a string and ends with "C", then strip
+ * the C and treat it as an integral multiple of the number of cores. If it's a string and does not end with a C, then treat
+ * it as a number in string form.
+ * @param numThreads
+ * @return
+ */
+ private static int getNumThreads(Object numThreads) {
+ if(numThreads instanceof Number) {
+ return ((Number)numThreads).intValue();
+ }
+ else if(numThreads instanceof String) {
+ String numThreadsStr = ((String)numThreads).trim().toUpperCase();
+ if(numThreadsStr.endsWith("C")) {
+ Integer factor = Integer.parseInt(numThreadsStr.replace("C", ""));
+ return factor*Runtime.getRuntime().availableProcessors();
+ }
+ else {
+ return Integer.parseInt(numThreadsStr);
+ }
+ }
+ return 2*Runtime.getRuntime().availableProcessors();
+ }
+
+ /**
+ * The strategy to use. This indicates which part of the config that this bolt uses
+ * to enrich, threat intel or enrichment. This must conform to one of the EnrichmentStrategies
+ * enum.
+ * @param strategy
+ * @return
+ */
+ public UnifiedEnrichmentBolt withStrategy(String strategy) {
+ this.strategy = EnrichmentStrategies.valueOf(strategy);
+ return this;
+ }
+
+ /**
+ * @param maxCacheSize Maximum size of cache before flushing
+ * @return Instance of this class
+ */
+ public UnifiedEnrichmentBolt withMaxCacheSize(long maxCacheSize) {
+ this.maxCacheSize = maxCacheSize;
+ return this;
+ }
+
+ /**
+ * @param maxTimeRetain Maximum time to retain cached entry before expiring
+ * @return Instance of this class
+ */
+
+ public UnifiedEnrichmentBolt withMaxTimeRetain(long maxTimeRetain) {
+ this.maxTimeRetain = maxTimeRetain;
+ return this;
+ }
+
+ /**
+ * Invalidate the cache on reload of bolt. By default, we do not.
+ * @param cacheInvalidationOnReload
+ * @return
+ */
+ public UnifiedEnrichmentBolt withCacheInvalidationOnReload(boolean cacheInvalidationOnReload) {
+ this.invalidateCacheOnReload= cacheInvalidationOnReload;
+ return this;
+ }
+
+
+ @Override
+ public void reloadCallback(String name, ConfigurationType type) {
+ if(invalidateCacheOnReload) {
+ if(strategy.getCache() != null) {
+ strategy.getCache().invalidateAll();
+ }
+ }
+ if(type == ConfigurationType.GLOBAL && enrichmentsByType != null) {
+ for(EnrichmentAdapter adapter : enrichmentsByType.values()) {
+ adapter.updateAdapter(getConfigurations().getGlobalConfig());
+ }
+ }
+ }
+
+
+ /**
+ * Fully enrich a message based on the strategy which was used to configure the bolt.
+ * Each enrichment is done in parallel and the results are joined together. Each enrichment
+ * will use a cache so computation is avoided if the result has been computed before.
+ *
+ * Errors in the enrichment result in an error message being sent on the "error" stream.
+ * The successful enrichments will be joined with the original message and the message will
+ * be sent along the "message" stream.
+ *
+ * @param input The input tuple to be processed.
+ */
+ @Override
+ public void execute(Tuple input) {
+ JSONObject message = generateMessage(input);
+ try {
+ String sourceType = MessageUtils.getSensorType(message);
+ SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sourceType);
+ if(config == null) {
+ LOG.debug("Unable to find SensorEnrichmentConfig for sourceType: {}", sourceType);
+ config = new SensorEnrichmentConfig();
+ }
+ //This is an existing kludge for the stellar adapter to pass information along.
+ //We should figure out if this can be rearchitected a bit. This smells.
+ config.getConfiguration().putIfAbsent(STELLAR_CONTEXT_CONF, stellarContext);
+ String guid = getGUID(input, message);
+
+ // enrich the message
+ ParallelEnricher.EnrichmentResult result = enricher.apply(message, strategy, config, perfLog);
+ JSONObject enriched = result.getResult();
+ enriched = strategy.postProcess(enriched, config, enrichmentContext);
+
+ //we can emit the message now
+ collector.emit("message",
+ input,
+ new Values(guid, enriched));
+ //and handle each of the errors in turn. If any adapter errored out, we will have one message per.
+ for(Map.Entry<Object, Throwable> t : result.getEnrichmentErrors()) {
+ LOG.error("[Metron] Unable to enrich message: {}", message, t);
+ MetronError error = new MetronError()
+ .withErrorType(strategy.getErrorType())
+ .withMessage(t.getValue().getMessage())
+ .withThrowable(t.getValue())
+ .addRawMessage(t.getKey());
+ ErrorUtils.handleError(collector, error);
+ }
+ } catch (Exception e) {
+ //If something terrible and unexpected happens then we want to send an error along, but this
+ //really shouldn't be happening.
+ LOG.error("[Metron] Unable to enrich message: {}", message, e);
+ MetronError error = new MetronError()
+ .withErrorType(strategy.getErrorType())
+ .withMessage(e.getMessage())
+ .withThrowable(e)
+ .addRawMessage(message);
+ ErrorUtils.handleError(collector, error);
+ }
+ finally {
+ collector.ack(input);
+ }
+ }
+
+ /**
+ * The message field name. If this is set, then use this field to retrieve the message.
+ * @param messageFieldName
+ * @return
+ */
+ public UnifiedEnrichmentBolt withMessageFieldName(String messageFieldName) {
+ this.messageFieldName = messageFieldName;
+ return this;
+ }
+
+ /**
+ * Take the tuple and construct the message.
+ * @param tuple
+ * @return
+ */
+ public JSONObject generateMessage(Tuple tuple) {
--- End diff --
Isn't this what the `MessageGetStrategy` was intended to solve? Can we reuse that logic here instead?
---