You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2018/04/18 14:59:36 UTC

[06/52] [abbrv] metron git commit: METRON-1467: Replace guava caches in places where the keyspace might be large closes apache/metron#947

METRON-1467: Replace guava caches in places where the keyspace might be large closes apache/metron#947


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/abb152b8
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/abb152b8
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/abb152b8

Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual
Commit: abb152b83631001ad067849dfaefd2d4e3b0cdb4
Parents: 9fb0d06
Author: cstella <ce...@gmail.com>
Authored: Wed Mar 7 11:20:56 2018 -0500
Committer: cstella <ce...@gmail.com>
Committed: Wed Mar 7 11:20:56 2018 -0500

----------------------------------------------------------------------
 metron-interface/metron-rest/pom.xml            |  5 +++
 metron-platform/metron-enrichment/pom.xml       |  2 +-
 .../enrichment/bolt/GenericEnrichmentBolt.java  | 19 ++++-------
 .../apache/metron/enrichment/bolt/JoinBolt.java | 34 +++++++++-----------
 .../bolt/GenericEnrichmentBoltTest.java         |  2 +-
 .../metron/enrichment/bolt/JoinBoltTest.java    |  7 ++--
 metron-stellar/stellar-common/pom.xml           |  5 +++
 .../stellar/common/BaseStellarProcessor.java    | 31 +++++++-----------
 .../stellar/dsl/functions/DateFunctions.java    |  8 ++---
 pom.xml                                         |  1 +
 10 files changed, 56 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/metron-interface/metron-rest/pom.xml
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/pom.xml b/metron-interface/metron-rest/pom.xml
index 44bad97..dcdea2b 100644
--- a/metron-interface/metron-rest/pom.xml
+++ b/metron-interface/metron-rest/pom.xml
@@ -38,6 +38,11 @@
         <eclipse.link.version>2.6.4</eclipse.link.version>
     </properties>
     <dependencies>
+      <dependency>
+          <groupId>com.github.ben-manes.caffeine</groupId>
+          <artifactId>caffeine</artifactId>
+          <version>${global_caffeine_version}</version>
+        </dependency>
         <dependency>
             <groupId>org.springframework.kafka</groupId>
             <artifactId>spring-kafka</artifactId>

http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/metron-platform/metron-enrichment/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/pom.xml b/metron-platform/metron-enrichment/pom.xml
index bcfb41b..1dffd8b 100644
--- a/metron-platform/metron-enrichment/pom.xml
+++ b/metron-platform/metron-enrichment/pom.xml
@@ -70,7 +70,7 @@
         <dependency>
           <groupId>com.github.ben-manes.caffeine</groupId>
           <artifactId>caffeine</artifactId>
-          <version>2.6.2</version>
+          <version>${global_caffeine_version}</version>
         </dependency>
 
         <dependency>

http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
index 7d67d2d..0677453 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -18,13 +18,13 @@
 
 package org.apache.metron.enrichment.bolt;
 
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.StringUtils;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
 import org.apache.metron.common.configuration.ConfigurationType;
@@ -146,13 +146,8 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt {
       throw new IllegalStateException("MAX_TIME_RETAIN_MINUTES must be specified");
     if (this.adapter == null)
       throw new IllegalStateException("Adapter must be specified");
-    loader = new CacheLoader<CacheKey, JSONObject>() {
-      @Override
-      public JSONObject load(CacheKey key) throws Exception {
-        return adapter.enrich(key);
-      }
-    };
-    cache = CacheBuilder.newBuilder().maximumSize(maxCacheSize)
+    loader = key -> adapter.enrich(key);
+    cache = Caffeine.newBuilder().maximumSize(maxCacheSize)
             .expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES)
             .build(loader);
     boolean success = adapter.initializeAdapter(getConfigurations().getGlobalConfig());
@@ -228,7 +223,7 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt {
               subGroup = adapter.getStreamSubGroup(enrichmentType, field);
 
               perfLog.mark("enrich");
-              enrichedField = cache.getUnchecked(cacheKey);
+              enrichedField = cache.get(cacheKey);
               perfLog.log("enrich", "key={}, time to run enrichment type={}", key, enrichmentType);
 
               if (enrichedField == null)

http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
index 61d7c32..a9263fb 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
@@ -17,13 +17,12 @@
  */
 package org.apache.metron.enrichment.bolt;
 
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.RemovalListener;
 import com.google.common.base.Joiner;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalCause;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.Sets;
 import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
@@ -46,6 +45,9 @@ import org.apache.storm.tuple.Values;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
 public abstract class JoinBolt<V> extends ConfiguredEnrichmentBolt {
 
   public static class Perf {} // used for performance logging
@@ -89,29 +91,25 @@ public abstract class JoinBolt<V> extends ConfiguredEnrichmentBolt {
     if (this.maxTimeRetain == null) {
       throw new IllegalStateException("maxTimeRetain must be specified");
     }
-    loader = new CacheLoader<String, Map<String, Tuple>>() {
-      @Override
-      public Map<String, Tuple> load(String key) throws Exception {
-        return new HashMap<>();
-      }
-    };
-    cache = CacheBuilder.newBuilder().maximumSize(maxCacheSize)
-            .expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES).removalListener(new JoinRemoveListener())
-            .build(loader);
+    loader = s -> new HashMap<>();
+    cache = Caffeine.newBuilder().maximumSize(maxCacheSize)
+                         .expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES)
+                         .removalListener(new JoinRemoveListener())
+                         .build(loader);
     prepare(map, topologyContext);
   }
 
   class JoinRemoveListener implements RemovalListener<String, Map<String, Tuple>> {
 
     @Override
-    public void onRemoval(RemovalNotification<String, Map<String, Tuple>> removalNotification) {
-      if (removalNotification.getCause() == RemovalCause.SIZE) {
+    public void onRemoval(@Nullable String s, @Nullable Map<String, Tuple> stringTupleMap, @Nonnull RemovalCause removalCause) {
+      if (removalCause == RemovalCause.SIZE) {
         String errorMessage = "Join cache reached max size limit. Increase the maxCacheSize setting or add more tasks to enrichment/threatintel join bolt.";
         Exception exception = new Exception(errorMessage);
         LOG.error(errorMessage, exception);
         collector.reportError(exception);
       }
-      if (removalNotification.getCause() == RemovalCause.EXPIRED) {
+      if (removalCause == RemovalCause.EXPIRED) {
         String errorMessage = "Message was in the join cache too long which may be caused by slow enrichments/threatintels.  Increase the maxTimeRetain setting.";
         Exception exception = new Exception(errorMessage);
         LOG.error(errorMessage, exception);

http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
index d7b54dd..17a53f4 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
@@ -240,7 +240,7 @@ public class GenericEnrichmentBoltTest extends BaseEnrichmentBoltTest {
               put("field2", "value2");
               put("source.type", "test");
             }})
-            .withThrowable(new CacheLoader.InvalidCacheLoadException("CacheLoader returned null for key CacheKey{field='field1', value='value1'}."));
+            .withThrowable(new Exception("[Metron] Could not enrich string: value1"));
     verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
index 1bb1083..0da6eaa 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.metron.enrichment.bolt;
 
-import com.google.common.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.LoadingCache;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.error.MetronError;
@@ -41,6 +41,7 @@ import java.util.concurrent.ExecutionException;
 
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyMap;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
@@ -176,10 +177,10 @@ public class JoinBoltTest extends BaseEnrichmentBoltTest {
     when(tuple.getValueByField("key")).thenReturn(key);
     when(tuple.getValueByField("message")).thenReturn(new JSONObject());
     joinBolt.cache = mock(LoadingCache.class);
-    when(joinBolt.cache.get(key)).thenThrow(new ExecutionException(new Exception("join exception")));
+    when(joinBolt.cache.get(any())).thenThrow(new RuntimeException(new Exception("join exception")));
 
     joinBolt.execute(tuple);
-    ExecutionException expectedExecutionException = new ExecutionException(new Exception("join exception"));
+    RuntimeException expectedExecutionException = new RuntimeException(new Exception("join exception"));
     MetronError error = new MetronError()
             .withErrorType(Constants.ErrorType.ENRICHMENT_ERROR)
             .withMessage("Joining problem: {}")

http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/metron-stellar/stellar-common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/pom.xml b/metron-stellar/stellar-common/pom.xml
index 6b07e68..dc4eb90 100644
--- a/metron-stellar/stellar-common/pom.xml
+++ b/metron-stellar/stellar-common/pom.xml
@@ -30,6 +30,11 @@
     </properties>
     <dependencies>
         <dependency>
+          <groupId>com.github.ben-manes.caffeine</groupId>
+          <artifactId>caffeine</artifactId>
+          <version>${global_caffeine_version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-auth</artifactId>
             <version>${global_hadoop_version}</version>

http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/BaseStellarProcessor.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/BaseStellarProcessor.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/BaseStellarProcessor.java
index 922feb7..941c66d 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/BaseStellarProcessor.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/BaseStellarProcessor.java
@@ -18,16 +18,14 @@
 
 package org.apache.metron.stellar.common;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.util.concurrent.UncheckedExecutionException;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import org.antlr.v4.runtime.ANTLRInputStream;
 import org.antlr.v4.runtime.CommonTokenStream;
 import org.antlr.v4.runtime.TokenStream;
 
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.metron.stellar.dsl.Context;
@@ -95,16 +93,11 @@ public class BaseStellarProcessor<T> {
                                                        , int expiryTime
                                                        , TimeUnit expiryUnit
                                                        ) {
-    CacheLoader<String, StellarCompiler.Expression> loader = new CacheLoader<String, StellarCompiler.Expression>() {
-      @Override
-      public StellarCompiler.Expression load(String key) throws Exception {
-        return compile(key);
-      }
-    };
-    return CacheBuilder.newBuilder()
-                       .maximumSize(cacheSize)
-                       .expireAfterAccess(expiryTime, expiryUnit)
-                       .build(loader);
+    CacheLoader<String, StellarCompiler.Expression> loader = key -> compile(key);
+    return Caffeine.newBuilder()
+                   .maximumSize(cacheSize)
+                   .expireAfterAccess(expiryTime, expiryUnit)
+                   .build(loader);
   }
 
   /**
@@ -119,8 +112,8 @@ public class BaseStellarProcessor<T> {
     }
     StellarCompiler.Expression expression = null;
     try {
-      expression = expressionCache.get(rule, () -> compile(rule));
-    } catch (ExecutionException e) {
+      expression = expressionCache.get(rule, r -> compile(r));
+    } catch (Throwable e) {
       throw new ParseException("Unable to parse: " + rule + " due to: " + e.getMessage(), e);
     }
     return expression.variablesUsed;
@@ -143,8 +136,8 @@ public class BaseStellarProcessor<T> {
       context.setActivityType(ActivityType.PARSE_ACTIVITY);
     }
     try {
-      expression = expressionCache.get(rule, () -> compile(rule));
-    } catch (ExecutionException|UncheckedExecutionException e) {
+      expression = expressionCache.get(rule, r -> compile(r));
+    } catch (Throwable e) {
       throw new ParseException("Unable to parse: " + rule + " due to: " + e.getMessage(), e);
     }
     try {

http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/DateFunctions.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/DateFunctions.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/DateFunctions.java
index 6031b6c..212d6e9 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/DateFunctions.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/DateFunctions.java
@@ -18,9 +18,9 @@
 
 package org.apache.metron.stellar.dsl.functions;
 
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
 import org.apache.metron.stellar.dsl.BaseStellarFunction;
 import org.apache.metron.stellar.dsl.Stellar;
 import org.apache.metron.stellar.common.utils.ConversionUtils;
@@ -77,7 +77,7 @@ public class DateFunctions {
   }
 
   private static LoadingCache<TimezonedFormat, ThreadLocal<SimpleDateFormat>> formatCache =
-          CacheBuilder.newBuilder().build(
+          Caffeine.newBuilder().build(
                   new CacheLoader<TimezonedFormat, ThreadLocal<SimpleDateFormat>>() {
                     @Override
                     public ThreadLocal<SimpleDateFormat> load(final TimezonedFormat format) throws Exception {

http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 98c942a..e1049dc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -85,6 +85,7 @@
         <base_flume_version>1.5.2</base_flume_version>
         <!-- full dependency versions -->
         <global_accumulo_version>1.8.0</global_accumulo_version>
+        <global_caffeine_version>2.6.2</global_caffeine_version>
         <global_antlr_version>4.5</global_antlr_version>
         <global_opencsv_version>3.7</global_opencsv_version>
         <global_curator_version>2.7.1</global_curator_version>