You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2018/04/18 14:59:36 UTC
[06/52] [abbrv] metron git commit: METRON-1467: Replace guava caches
in places where the keyspace might be large closes apache/metron#947
METRON-1467: Replace guava caches in places where the keyspace might be large closes apache/metron#947
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/abb152b8
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/abb152b8
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/abb152b8
Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual
Commit: abb152b83631001ad067849dfaefd2d4e3b0cdb4
Parents: 9fb0d06
Author: cstella <ce...@gmail.com>
Authored: Wed Mar 7 11:20:56 2018 -0500
Committer: cstella <ce...@gmail.com>
Committed: Wed Mar 7 11:20:56 2018 -0500
----------------------------------------------------------------------
metron-interface/metron-rest/pom.xml | 5 +++
metron-platform/metron-enrichment/pom.xml | 2 +-
.../enrichment/bolt/GenericEnrichmentBolt.java | 19 ++++-------
.../apache/metron/enrichment/bolt/JoinBolt.java | 34 +++++++++-----------
.../bolt/GenericEnrichmentBoltTest.java | 2 +-
.../metron/enrichment/bolt/JoinBoltTest.java | 7 ++--
metron-stellar/stellar-common/pom.xml | 5 +++
.../stellar/common/BaseStellarProcessor.java | 31 +++++++-----------
.../stellar/dsl/functions/DateFunctions.java | 8 ++---
pom.xml | 1 +
10 files changed, 56 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/metron-interface/metron-rest/pom.xml
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/pom.xml b/metron-interface/metron-rest/pom.xml
index 44bad97..dcdea2b 100644
--- a/metron-interface/metron-rest/pom.xml
+++ b/metron-interface/metron-rest/pom.xml
@@ -38,6 +38,11 @@
<eclipse.link.version>2.6.4</eclipse.link.version>
</properties>
<dependencies>
+ <dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ <version>${global_caffeine_version}</version>
+ </dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/metron-platform/metron-enrichment/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/pom.xml b/metron-platform/metron-enrichment/pom.xml
index bcfb41b..1dffd8b 100644
--- a/metron-platform/metron-enrichment/pom.xml
+++ b/metron-platform/metron-enrichment/pom.xml
@@ -70,7 +70,7 @@
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
- <version>2.6.2</version>
+ <version>${global_caffeine_version}</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
index 7d67d2d..0677453 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -18,13 +18,13 @@
package org.apache.metron.enrichment.bolt;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.StringUtils;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
import org.apache.metron.common.Constants;
import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
import org.apache.metron.common.configuration.ConfigurationType;
@@ -146,13 +146,8 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt {
throw new IllegalStateException("MAX_TIME_RETAIN_MINUTES must be specified");
if (this.adapter == null)
throw new IllegalStateException("Adapter must be specified");
- loader = new CacheLoader<CacheKey, JSONObject>() {
- @Override
- public JSONObject load(CacheKey key) throws Exception {
- return adapter.enrich(key);
- }
- };
- cache = CacheBuilder.newBuilder().maximumSize(maxCacheSize)
+ loader = key -> adapter.enrich(key);
+ cache = Caffeine.newBuilder().maximumSize(maxCacheSize)
.expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES)
.build(loader);
boolean success = adapter.initializeAdapter(getConfigurations().getGlobalConfig());
@@ -228,7 +223,7 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt {
subGroup = adapter.getStreamSubGroup(enrichmentType, field);
perfLog.mark("enrich");
- enrichedField = cache.getUnchecked(cacheKey);
+ enrichedField = cache.get(cacheKey);
perfLog.log("enrich", "key={}, time to run enrichment type={}", key, enrichmentType);
if (enrichedField == null)
http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
index 61d7c32..a9263fb 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
@@ -17,13 +17,12 @@
*/
package org.apache.metron.enrichment.bolt;
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.RemovalListener;
import com.google.common.base.Joiner;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalCause;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Sets;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
@@ -46,6 +45,9 @@ import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
public abstract class JoinBolt<V> extends ConfiguredEnrichmentBolt {
public static class Perf {} // used for performance logging
@@ -89,29 +91,25 @@ public abstract class JoinBolt<V> extends ConfiguredEnrichmentBolt {
if (this.maxTimeRetain == null) {
throw new IllegalStateException("maxTimeRetain must be specified");
}
- loader = new CacheLoader<String, Map<String, Tuple>>() {
- @Override
- public Map<String, Tuple> load(String key) throws Exception {
- return new HashMap<>();
- }
- };
- cache = CacheBuilder.newBuilder().maximumSize(maxCacheSize)
- .expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES).removalListener(new JoinRemoveListener())
- .build(loader);
+ loader = s -> new HashMap<>();
+ cache = Caffeine.newBuilder().maximumSize(maxCacheSize)
+ .expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES)
+ .removalListener(new JoinRemoveListener())
+ .build(loader);
prepare(map, topologyContext);
}
class JoinRemoveListener implements RemovalListener<String, Map<String, Tuple>> {
@Override
- public void onRemoval(RemovalNotification<String, Map<String, Tuple>> removalNotification) {
- if (removalNotification.getCause() == RemovalCause.SIZE) {
+ public void onRemoval(@Nullable String s, @Nullable Map<String, Tuple> stringTupleMap, @Nonnull RemovalCause removalCause) {
+ if (removalCause == RemovalCause.SIZE) {
String errorMessage = "Join cache reached max size limit. Increase the maxCacheSize setting or add more tasks to enrichment/threatintel join bolt.";
Exception exception = new Exception(errorMessage);
LOG.error(errorMessage, exception);
collector.reportError(exception);
}
- if (removalNotification.getCause() == RemovalCause.EXPIRED) {
+ if (removalCause == RemovalCause.EXPIRED) {
String errorMessage = "Message was in the join cache too long which may be caused by slow enrichments/threatintels. Increase the maxTimeRetain setting.";
Exception exception = new Exception(errorMessage);
LOG.error(errorMessage, exception);
http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
index d7b54dd..17a53f4 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
@@ -240,7 +240,7 @@ public class GenericEnrichmentBoltTest extends BaseEnrichmentBoltTest {
put("field2", "value2");
put("source.type", "test");
}})
- .withThrowable(new CacheLoader.InvalidCacheLoadException("CacheLoader returned null for key CacheKey{field='field1', value='value1'}."));
+ .withThrowable(new Exception("[Metron] Could not enrich string: value1"));
verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
index 1bb1083..0da6eaa 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
@@ -17,7 +17,7 @@
*/
package org.apache.metron.enrichment.bolt;
-import com.google.common.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.LoadingCache;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.metron.common.Constants;
import org.apache.metron.common.error.MetronError;
@@ -41,6 +41,7 @@ import java.util.concurrent.ExecutionException;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyMap;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
@@ -176,10 +177,10 @@ public class JoinBoltTest extends BaseEnrichmentBoltTest {
when(tuple.getValueByField("key")).thenReturn(key);
when(tuple.getValueByField("message")).thenReturn(new JSONObject());
joinBolt.cache = mock(LoadingCache.class);
- when(joinBolt.cache.get(key)).thenThrow(new ExecutionException(new Exception("join exception")));
+ when(joinBolt.cache.get(any())).thenThrow(new RuntimeException(new Exception("join exception")));
joinBolt.execute(tuple);
- ExecutionException expectedExecutionException = new ExecutionException(new Exception("join exception"));
+ RuntimeException expectedExecutionException = new RuntimeException(new Exception("join exception"));
MetronError error = new MetronError()
.withErrorType(Constants.ErrorType.ENRICHMENT_ERROR)
.withMessage("Joining problem: {}")
http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/metron-stellar/stellar-common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/pom.xml b/metron-stellar/stellar-common/pom.xml
index 6b07e68..dc4eb90 100644
--- a/metron-stellar/stellar-common/pom.xml
+++ b/metron-stellar/stellar-common/pom.xml
@@ -30,6 +30,11 @@
</properties>
<dependencies>
<dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ <version>${global_caffeine_version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>${global_hadoop_version}</version>
http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/BaseStellarProcessor.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/BaseStellarProcessor.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/BaseStellarProcessor.java
index 922feb7..941c66d 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/BaseStellarProcessor.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/BaseStellarProcessor.java
@@ -18,16 +18,14 @@
package org.apache.metron.stellar.common;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.util.concurrent.UncheckedExecutionException;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
import org.antlr.v4.runtime.ANTLRInputStream;
import org.antlr.v4.runtime.CommonTokenStream;
import org.antlr.v4.runtime.TokenStream;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.metron.stellar.dsl.Context;
@@ -95,16 +93,11 @@ public class BaseStellarProcessor<T> {
, int expiryTime
, TimeUnit expiryUnit
) {
- CacheLoader<String, StellarCompiler.Expression> loader = new CacheLoader<String, StellarCompiler.Expression>() {
- @Override
- public StellarCompiler.Expression load(String key) throws Exception {
- return compile(key);
- }
- };
- return CacheBuilder.newBuilder()
- .maximumSize(cacheSize)
- .expireAfterAccess(expiryTime, expiryUnit)
- .build(loader);
+ CacheLoader<String, StellarCompiler.Expression> loader = key -> compile(key);
+ return Caffeine.newBuilder()
+ .maximumSize(cacheSize)
+ .expireAfterAccess(expiryTime, expiryUnit)
+ .build(loader);
}
/**
@@ -119,8 +112,8 @@ public class BaseStellarProcessor<T> {
}
StellarCompiler.Expression expression = null;
try {
- expression = expressionCache.get(rule, () -> compile(rule));
- } catch (ExecutionException e) {
+ expression = expressionCache.get(rule, r -> compile(r));
+ } catch (Throwable e) {
throw new ParseException("Unable to parse: " + rule + " due to: " + e.getMessage(), e);
}
return expression.variablesUsed;
@@ -143,8 +136,8 @@ public class BaseStellarProcessor<T> {
context.setActivityType(ActivityType.PARSE_ACTIVITY);
}
try {
- expression = expressionCache.get(rule, () -> compile(rule));
- } catch (ExecutionException|UncheckedExecutionException e) {
+ expression = expressionCache.get(rule, r -> compile(r));
+ } catch (Throwable e) {
throw new ParseException("Unable to parse: " + rule + " due to: " + e.getMessage(), e);
}
try {
http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/DateFunctions.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/DateFunctions.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/DateFunctions.java
index 6031b6c..212d6e9 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/DateFunctions.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/DateFunctions.java
@@ -18,9 +18,9 @@
package org.apache.metron.stellar.dsl.functions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
import org.apache.metron.stellar.dsl.BaseStellarFunction;
import org.apache.metron.stellar.dsl.Stellar;
import org.apache.metron.stellar.common.utils.ConversionUtils;
@@ -77,7 +77,7 @@ public class DateFunctions {
}
private static LoadingCache<TimezonedFormat, ThreadLocal<SimpleDateFormat>> formatCache =
- CacheBuilder.newBuilder().build(
+ Caffeine.newBuilder().build(
new CacheLoader<TimezonedFormat, ThreadLocal<SimpleDateFormat>>() {
@Override
public ThreadLocal<SimpleDateFormat> load(final TimezonedFormat format) throws Exception {
http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 98c942a..e1049dc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -85,6 +85,7 @@
<base_flume_version>1.5.2</base_flume_version>
<!-- full dependency versions -->
<global_accumulo_version>1.8.0</global_accumulo_version>
+ <global_caffeine_version>2.6.2</global_caffeine_version>
<global_antlr_version>4.5</global_antlr_version>
<global_opencsv_version>3.7</global_opencsv_version>
<global_curator_version>2.7.1</global_curator_version>