You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2019/06/17 18:33:00 UTC
[metron] branch master updated: METRON-2073 Create in-memory use
case for enrichment with map type and flatfile summarizer (merrimanr)
closes apache/metron#1399
This is an automated email from the ASF dual-hosted git repository.
rmerriman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/master by this push:
new 38b8a78 METRON-2073 Create in-memory use case for enrichment with map type and flatfile summarizer (merrimanr) closes apache/metron#1399
38b8a78 is described below
commit 38b8a7824a3ceed829215c8ce4ecad963ebf449e
Author: merrimanr <me...@gmail.com>
AuthorDate: Mon Jun 17 13:32:44 2019 -0500
METRON-2073 Create in-memory use case for enrichment with map type and flatfile summarizer (merrimanr) closes apache/metron#1399
---
.../metron/enrichment/cache/ObjectCache.java | 123 +++++++++++++++++
.../metron/enrichment/cache/ObjectCacheConfig.java | 115 ++++++++++++++++
.../enrichment/stellar/EnrichmentObjectGet.java | 101 ++++++++++++++
.../metron/enrichment/stellar/ObjectGet.java | 94 ++-----------
.../ObjectCacheTest.java} | 63 ++++++---
.../EnrichmentObjectGetIntegrationTest.java | 72 ++++++++++
.../stellar/EnrichmentObjectGetTest.java | 152 +++++++++++++++++++++
.../stellar/ObjectGetIntegrationTest.java | 70 ++++++++++
.../metron/enrichment/stellar/ObjectGetTest.java | 115 +++++++++-------
9 files changed, 757 insertions(+), 148 deletions(-)
diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/cache/ObjectCache.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/cache/ObjectCache.java
new file mode 100644
index 0000000..9d22bfc
--- /dev/null
+++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/cache/ObjectCache.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.enrichment.cache;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.common.utils.SerDeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class ObjectCache {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ protected LoadingCache<String, Object> cache;
+ private static ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ public class Loader implements CacheLoader<String, Object> {
+ FileSystem fs;
+ ObjectCacheConfig objectCacheConfig;
+
+ public Loader(Configuration hadoopConfig, ObjectCacheConfig objectCacheConfig) throws IOException {
+ this.fs = FileSystem.get(hadoopConfig);
+ this.objectCacheConfig = objectCacheConfig;
+ }
+
+ @Override
+ public Object load(String s) throws Exception {
+ LOG.debug("Loading object from path '{}'", s);
+ if (StringUtils.isEmpty(s)) {
+ throw new IllegalArgumentException("Path cannot be empty");
+ }
+ Object object = null;
+ Path p = new Path(s);
+ if (fs.exists(p)) {
+ if (fs.getFileStatus(p).getLen() <= objectCacheConfig.getMaxFileSize()) {
+ try (InputStream is = new BufferedInputStream(fs.open(p))) {
+ byte[] serialized = IOUtils.toByteArray(is);
+ if (serialized.length > 0) {
+ object = SerDeUtils.fromBytes(serialized, Object.class);
+ }
+ }
+ } else {
+ throw new IllegalArgumentException(String.format("File at path '%s' is larger than the configured max file size of %s", p, objectCacheConfig.getMaxFileSize()));
+ }
+ } else {
+ throw new IllegalArgumentException(String.format("Path '%s' could not be found in HDFS", s));
+ }
+ return object;
+ }
+ }
+
+ public Object get(String path) {
+ return cache.get(path);
+ }
+
+ public void initialize(ObjectCacheConfig config) {
+ try {
+ lock.writeLock().lock();
+
+ cache = setupCache(config);
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to initialize: " + e.getMessage(), e);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public boolean isInitialized() {
+ try {
+ lock.readLock().lock();
+ return cache != null;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ protected LoadingCache<String, Object> setupCache(ObjectCacheConfig config) throws IOException {
+ LOG.info("Building ObjectCache with {}", config);
+ return Caffeine.newBuilder().maximumSize(config.getCacheSize())
+ .expireAfterWrite(config.getCacheExpiration(), config.getTimeUnit())
+ .removalListener((path, value, removalCause) -> {
+ LOG.debug("Object retrieved from path '{}' was removed with cause {}", path, removalCause);
+ })
+ .build(new Loader(new Configuration(), config));
+ }
+
+ public boolean isEmpty() {
+ return cache == null || cache.estimatedSize() == 0;
+ }
+
+ public boolean containsKey(String key) {
+ return cache != null && cache.asMap().containsKey(key);
+ }
+}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/cache/ObjectCacheConfig.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/cache/ObjectCacheConfig.java
new file mode 100644
index 0000000..2c0c97e
--- /dev/null
+++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/cache/ObjectCacheConfig.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.enrichment.cache;
+
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+public class ObjectCacheConfig {
+
+ public static final String OBJECT_CACHE_SIZE_KEY = "object.cache.size";
+ public static final String OBJECT_CACHE_EXPIRATION_MINUTES_KEY = "object.cache.expiration.minutes";
+ public static final String OBJECT_CACHE_EXPIRATION_KEY = "object.cache.expiration";
+ public static final String OBJECT_CACHE_TIME_UNIT_KEY = "object.cache.time.unit";
+ public static final String OBJECT_CACHE_MAX_FILE_SIZE_KEY = "object.cache.max.file.size";
+ public static final long OBJECT_CACHE_SIZE_DEFAULT = 1000;
+ public static final long OBJECT_CACHE_EXPIRATION_MIN_DEFAULT = 1440;
+ public static final TimeUnit OBJECT_CACHE_TIME_UNIT_DEFAULT = TimeUnit.MINUTES;
+ public static final long OBJECT_CACHE_MAX_FILE_SIZE_DEFAULT = 1048576; // default to 1 mb
+
+ private long cacheSize;
+ private long cacheExpiration;
+ private TimeUnit timeUnit;
+ private long maxFileSize;
+
+ public ObjectCacheConfig(Map<String, Object> config) {
+ cacheSize = ConversionUtils.convert(config.getOrDefault(OBJECT_CACHE_SIZE_KEY, OBJECT_CACHE_SIZE_DEFAULT), Long.class);
+ if (config.containsKey(OBJECT_CACHE_EXPIRATION_MINUTES_KEY)) {
+ cacheExpiration = ConversionUtils.convert(config.getOrDefault(OBJECT_CACHE_EXPIRATION_MINUTES_KEY, OBJECT_CACHE_EXPIRATION_MIN_DEFAULT), Long.class);
+ timeUnit = OBJECT_CACHE_TIME_UNIT_DEFAULT;
+ } else {
+ cacheExpiration = ConversionUtils.convert(config.getOrDefault(OBJECT_CACHE_EXPIRATION_KEY, OBJECT_CACHE_EXPIRATION_MIN_DEFAULT), Long.class);
+ timeUnit = config.containsKey(OBJECT_CACHE_TIME_UNIT_KEY) ?
+ TimeUnit.valueOf((String) config.get(OBJECT_CACHE_TIME_UNIT_KEY)) : OBJECT_CACHE_TIME_UNIT_DEFAULT;
+ }
+ maxFileSize = ConversionUtils.convert(config.getOrDefault(OBJECT_CACHE_MAX_FILE_SIZE_KEY, OBJECT_CACHE_MAX_FILE_SIZE_DEFAULT), Long.class);
+ }
+
+ public long getCacheSize() {
+ return cacheSize;
+ }
+
+ public void setCacheSize(long cacheSize) {
+ this.cacheSize = cacheSize;
+ }
+
+ public long getCacheExpiration() {
+ return cacheExpiration;
+ }
+
+ public void setCacheExpiration(long cacheExpiration) {
+ this.cacheExpiration = cacheExpiration;
+ }
+
+ public TimeUnit getTimeUnit() {
+ return timeUnit;
+ }
+
+ public void setTimeUnit(TimeUnit timeUnit) {
+ this.timeUnit = timeUnit;
+ }
+
+ public long getMaxFileSize() {
+ return maxFileSize;
+ }
+
+ public void setMaxFileSize(long maxFileSize) {
+ this.maxFileSize = maxFileSize;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ObjectCacheConfig that = (ObjectCacheConfig) o;
+ return cacheSize == that.cacheSize &&
+ cacheExpiration == that.cacheExpiration &&
+ timeUnit == that.timeUnit &&
+ maxFileSize == that.maxFileSize;
+ }
+
+ @Override
+ public int hashCode() {
+
+ return Objects.hash(cacheSize, cacheExpiration, timeUnit, maxFileSize);
+ }
+
+ @Override
+ public String toString() {
+ return "ObjectCacheConfig{" +
+ "cacheSize=" + cacheSize +
+ ", cacheExpiration=" + cacheExpiration +
+ ", timeUnit=" + timeUnit +
+ ", maxFileSize=" + maxFileSize +
+ '}';
+ }
+}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/stellar/EnrichmentObjectGet.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/stellar/EnrichmentObjectGet.java
new file mode 100644
index 0000000..11ebe88
--- /dev/null
+++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/stellar/EnrichmentObjectGet.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.enrichment.stellar;
+
+import org.apache.metron.enrichment.cache.ObjectCache;
+import org.apache.metron.enrichment.cache.ObjectCacheConfig;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.ParseException;
+import org.apache.metron.stellar.dsl.Stellar;
+import org.apache.metron.stellar.dsl.StellarFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.metron.enrichment.stellar.EnrichmentObjectGet.ENRICHMENT_OBJECT_GET_SETTINGS;
+
+@Stellar(namespace="ENRICHMENT"
+ ,name="OBJECT_GET"
+ ,description="Retrieve and deserialize a serialized object from HDFS and stores it in the ObjectCache, " +
+ "then returns the value associated with the indicator." +
+ "The cache can be specified via three properties in the global config: " +
+ "\"" + ObjectCacheConfig.OBJECT_CACHE_SIZE_KEY + "\" (default " + ObjectCacheConfig.OBJECT_CACHE_SIZE_DEFAULT + ")," +
+ "\"" + ObjectCacheConfig.OBJECT_CACHE_EXPIRATION_KEY + "\" (default " + ObjectCacheConfig.OBJECT_CACHE_EXPIRATION_MIN_DEFAULT + ")," +
+ "\"" + ObjectCacheConfig.OBJECT_CACHE_TIME_UNIT_KEY+ "\" (default MINUTES)." +
+ "Cache settings that apply only to this function can also be specified in the global config by nesting the settings above under the " + ENRICHMENT_OBJECT_GET_SETTINGS + " key." +
+ "Note, if these are changed in global config, topology restart is required."
+ , params = {
+ "path - The path in HDFS to the serialized object" +
+ "indicator - The string indicator to look up"
+ }
+ , returns="Value associated with the indicator."
+)
+public class EnrichmentObjectGet implements StellarFunction {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ public final static String ENRICHMENT_OBJECT_GET_SETTINGS = "enrichment.object.get.settings";
+
+ private ObjectCache objectCache;
+
+ @Override
+ public Object apply(List<Object> args, Context context) throws ParseException {
+ if(args.size() != 2) {
+ throw new IllegalArgumentException("All parameters are mandatory, submit 'hdfs path', 'indicator'");
+ }
+ if(!isInitialized()) {
+ return null;
+ }
+
+ String path = (String) args.get(0);
+ String indicator = (String) args.get(1);
+ if(path == null || indicator == null) {
+ return null;
+ }
+
+ Object value;
+ try {
+ Map cachedMap = (Map) objectCache.get(path);
+ LOG.debug("Looking up value from object at path '{}' using indicator {}", path, indicator);
+ value = cachedMap.get(indicator);
+ } catch(ClassCastException e) {
+ throw new ClassCastException(String.format("The object stored in HDFS at '%s' must be serialized in JSON format.", path));
+ }
+
+ return value;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void initialize(Context context) {
+ Map<String, Object> config = (Map<String, Object>) context.getCapability(Context.Capabilities.GLOBAL_CONFIG, false)
+ .orElse(new HashMap<>());
+ Map<String, Object> enrichmentGetConfig = (Map<String, Object>) config.getOrDefault(ENRICHMENT_OBJECT_GET_SETTINGS, new HashMap<>());
+ ObjectCacheConfig objectCacheConfig = new ObjectCacheConfig(enrichmentGetConfig);
+ objectCache = new ObjectCache();
+ objectCache.initialize(objectCacheConfig);
+ }
+
+ @Override
+ public boolean isInitialized() {
+ return objectCache != null && objectCache.isInitialized();
+ }
+}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/stellar/ObjectGet.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/stellar/ObjectGet.java
index ebb94da..dddef5f 100644
--- a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/stellar/ObjectGet.java
+++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/stellar/ObjectGet.java
@@ -18,16 +18,8 @@
package org.apache.metron.enrichment.stellar;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.metron.common.utils.SerDeUtils;
-import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.apache.metron.enrichment.cache.ObjectCache;
+import org.apache.metron.enrichment.cache.ObjectCacheConfig;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.ParseException;
import org.apache.metron.stellar.dsl.Stellar;
@@ -35,26 +27,19 @@ import org.apache.metron.stellar.dsl.StellarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
@Stellar(namespace="OBJECT"
,name="GET"
,description="Retrieve and deserialize a serialized object from HDFS. " +
- "The cache can be specified via two properties in the global config: " +
- "\"" + ObjectGet.OBJECT_CACHE_SIZE_KEY + "\" (default " + ObjectGet.OBJECT_CACHE_SIZE_DEFAULT + ")," +
- "\"" + ObjectGet.OBJECT_CACHE_EXPIRATION_KEY+ "\" (default 1440). Note, if these are changed in global config, " +
- "topology restart is required."
+ "The cache can be specified via three properties in the global config: " +
+ "\"" + ObjectCacheConfig.OBJECT_CACHE_SIZE_KEY + "\" (default " + ObjectCacheConfig.OBJECT_CACHE_SIZE_DEFAULT + ")," +
+ "\"" + ObjectCacheConfig.OBJECT_CACHE_EXPIRATION_KEY + "\" (default " + ObjectCacheConfig.OBJECT_CACHE_EXPIRATION_MIN_DEFAULT + ")," +
+ "\"" + ObjectCacheConfig.OBJECT_CACHE_TIME_UNIT_KEY+ "\" (default MINUTES)." +
+ "Note, if these are changed in global config, topology restart is required."
, params = {
"path - The path in HDFS to the serialized object"
}
@@ -62,36 +47,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
)
public class ObjectGet implements StellarFunction {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- public static final String OBJECT_CACHE_SIZE_KEY = "object.cache.size";
- public static final String OBJECT_CACHE_EXPIRATION_KEY = "object.cache.expiration.minutes";
- public static final int OBJECT_CACHE_SIZE_DEFAULT = 1000;
- public static final long OBJECT_CACHE_EXPIRATION_MIN_DEFAULT = TimeUnit.HOURS.toMinutes(24);
- protected static LoadingCache<String, Object> cache;
- private static ReadWriteLock lock = new ReentrantReadWriteLock();
- public static class Loader extends CacheLoader<String, Object> {
- FileSystem fs;
- public Loader(Configuration hadoopConfig) throws IOException {
- this.fs = FileSystem.get(hadoopConfig);
- }
- @Override
- public Object load(String s) throws Exception {
- if(StringUtils.isEmpty(s)) {
- return null;
- }
- Path p = new Path(s);
- if(fs.exists(p)) {
- try(InputStream is = new BufferedInputStream(fs.open(p))) {
- byte[] serialized = IOUtils.toByteArray(is);
- if(serialized.length > 0) {
- Object ret = SerDeUtils.fromBytes(serialized, Object.class);
- return ret;
- }
- }
- }
- return null;
- }
- }
+ private ObjectCache objectCache;
@Override
public Object apply(List<Object> args, Context context) throws ParseException {
@@ -106,11 +63,7 @@ public class ObjectGet implements StellarFunction {
return null;
}
if(o instanceof String) {
- try {
- return cache.get((String)o);
- } catch (ExecutionException e) {
- throw new IllegalStateException("Unable to retrieve " + o + " because " + e.getMessage(), e);
- }
+ return objectCache.get((String) o);
}
else {
throw new IllegalStateException("Unable to retrieve " + o + " as it is not a path");
@@ -119,35 +72,14 @@ public class ObjectGet implements StellarFunction {
@Override
public void initialize(Context context) {
- try {
- lock.writeLock().lock();
- Map<String, Object> config = getConfig(context);
- long size = ConversionUtils.convert(config.getOrDefault(OBJECT_CACHE_SIZE_KEY, OBJECT_CACHE_SIZE_DEFAULT), Long.class);
- long expiryMin = ConversionUtils.convert(config.getOrDefault(OBJECT_CACHE_EXPIRATION_KEY, OBJECT_CACHE_EXPIRATION_MIN_DEFAULT), Long.class);
- cache = setupCache(size, expiryMin);
- } catch (IOException e) {
- throw new IllegalStateException("Unable to initialize: " + e.getMessage(), e);
- } finally {
- lock.writeLock().unlock();
- }
+ Map<String, Object> config = getConfig(context);
+ objectCache = new ObjectCache();
+ objectCache.initialize(new ObjectCacheConfig(config));
}
@Override
public boolean isInitialized() {
- try {
- lock.readLock().lock();
- return cache != null;
- }
- finally {
- lock.readLock().unlock();
- }
- }
-
- protected LoadingCache<String, Object> setupCache(long size, long expiryMin) throws IOException {
- return CacheBuilder.newBuilder()
- .maximumSize(size)
- .expireAfterAccess(expiryMin, TimeUnit.MINUTES)
- .build(new Loader(new Configuration()));
+ return objectCache != null && objectCache.isInitialized();
}
protected Map<String, Object> getConfig(Context context) {
diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/ObjectGetTest.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/cache/ObjectCacheTest.java
similarity index 54%
copy from metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/ObjectGetTest.java
copy to metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/cache/ObjectCacheTest.java
index 400dfb8..4ad6494 100644
--- a/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/ObjectGetTest.java
+++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/cache/ObjectCacheTest.java
@@ -16,28 +16,35 @@
* limitations under the License.
*/
-package org.apache.metron.enrichment.stellar;
+package org.apache.metron.enrichment.cache;
-import com.google.common.collect.ImmutableMap;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.metron.common.utils.SerDeUtils;
-import org.apache.metron.stellar.common.utils.StellarProcessorUtils;
+import org.apache.metron.integration.utils.TestUtils;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
-import java.io.BufferedOutputStream;
-import java.io.IOException;
+import java.io.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
-public class ObjectGetTest {
- FileSystem fs;
- List<String> data;
+public class ObjectCacheTest {
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ private FileSystem fs;
+ private List<String> data;
+ private ObjectCache cache;
+ private File tempDir;
@Before
public void setup() throws IOException {
@@ -49,35 +56,38 @@ public class ObjectGetTest {
data.add("is");
data.add("great");
}
-
+ cache = new ObjectCache();
+ tempDir = TestUtils.createTempDir(this.getClass().getName());
}
@Test
public void test() throws Exception {
- String filename = "target/ogt/test.ser";
- Assert.assertTrue(ObjectGet.cache == null || !ObjectGet.cache.asMap().containsKey(filename));
+ String filename = "test.ser";
+ Assert.assertTrue(cache.isEmpty() || !cache.containsKey(filename));
assertDataIsReadCorrectly(filename);
}
public void assertDataIsReadCorrectly(String filename) throws IOException {
- try(BufferedOutputStream bos = new BufferedOutputStream(fs.create(new Path(filename), true))) {
+ File file = new File(tempDir, filename);
+ try(BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(file))) {
IOUtils.write(SerDeUtils.toBytes(data), bos);
}
- List<String> readData = (List<String>) StellarProcessorUtils.run("OBJECT_GET(loc)", ImmutableMap.of("loc", filename));
+ cache.initialize(new ObjectCacheConfig(new HashMap<>()));
+ List<String> readData = (List<String>) cache.get(file.getAbsolutePath());
Assert.assertEquals(readData, data);
- Assert.assertTrue(ObjectGet.cache.asMap().containsKey(filename));
+ Assert.assertTrue(cache.containsKey(file.getAbsolutePath()));
}
@Test
public void testMultithreaded() throws Exception {
- String filename = "target/ogt/testmulti.ser";
- Assert.assertTrue(ObjectGet.cache == null || !ObjectGet.cache.asMap().containsKey(filename));
+ String filename = "testmulti.ser";
+ Assert.assertTrue(cache.isEmpty() || !cache.containsKey(filename));
Thread[] ts = new Thread[10];
for(int i = 0;i < ts.length;++i) {
ts[i] = new Thread(() -> {
try {
assertDataIsReadCorrectly(filename);
- } catch (IOException e) {
+ } catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
});
@@ -87,4 +97,23 @@ public class ObjectGetTest {
t.join();
}
}
+
+ @Test
+ public void shouldThrowExceptionOnMaxFileSize() throws Exception {
+ String filename = "maxSizeException.ser";
+ File file = new File(tempDir, filename);
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(String.format("File at path '%s' is larger than the configured max file size of 1", file.getAbsolutePath()));
+
+
+ try(BufferedOutputStream bos = new BufferedOutputStream(fs.create(new Path(file.getAbsolutePath()), true))) {
+ IOUtils.write(SerDeUtils.toBytes(data), bos);
+ }
+ ObjectCacheConfig objectCacheConfig = new ObjectCacheConfig(new HashMap<>());
+ objectCacheConfig.setMaxFileSize(1);
+ cache.initialize(objectCacheConfig);
+
+ cache.get(file.getAbsolutePath());
+ }
}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/EnrichmentObjectGetIntegrationTest.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/EnrichmentObjectGetIntegrationTest.java
new file mode 100644
index 0000000..b9ebb3f
--- /dev/null
+++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/EnrichmentObjectGetIntegrationTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.metron.enrichment.stellar;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.metron.common.utils.SerDeUtils;
+import org.apache.metron.integration.utils.TestUtils;
+import org.apache.metron.stellar.common.utils.StellarProcessorUtils;
+import org.apache.metron.stellar.dsl.ParseException;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+
+public class EnrichmentObjectGetIntegrationTest {
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ private File file;
+
+ @Before
+ public void setup() throws Exception {
+ File tempDir = TestUtils.createTempDir(this.getClass().getName());
+ file = new File(tempDir, "enrichment.ser");
+ try(BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(file))) {
+ IOUtils.write(SerDeUtils.toBytes(new HashMap<String, Object>() {{
+ put("key", "value");
+ }}), bos);
+ }
+ }
+
+ @Test
+ public void shouldReturnEnrichment() {
+ String expression = String.format("ENRICHMENT_OBJECT_GET('%s', '%s')", file.getAbsolutePath(), "key");
+ String value = (String) StellarProcessorUtils.run(expression, new HashMap<>());
+ assertEquals("value", value);
+ }
+
+ @Test
+ public void shouldThrowExceptionOnInvalidPath() {
+ thrown.expect(ParseException.class);
+ thrown.expectMessage("Unable to parse ENRICHMENT_OBJECT_GET('/some/path', 'key'): Unable to parse: ENRICHMENT_OBJECT_GET('/some/path', 'key') due to: Path '/some/path' could not be found in HDFS");
+
+ String expression = String.format("ENRICHMENT_OBJECT_GET('%s', '%s')", "/some/path", "key");
+ StellarProcessorUtils.run(expression, new HashMap<>());
+ }
+}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/EnrichmentObjectGetTest.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/EnrichmentObjectGetTest.java
new file mode 100644
index 0000000..b12e666
--- /dev/null
+++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/EnrichmentObjectGetTest.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.enrichment.stellar;
+
+import org.apache.metron.enrichment.cache.ObjectCache;
+import org.apache.metron.enrichment.cache.ObjectCacheConfig;
+import org.apache.metron.stellar.dsl.Context;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.metron.enrichment.cache.ObjectCacheConfig.OBJECT_CACHE_EXPIRATION_KEY;
+import static org.apache.metron.enrichment.cache.ObjectCacheConfig.OBJECT_CACHE_MAX_FILE_SIZE_KEY;
+import static org.apache.metron.enrichment.cache.ObjectCacheConfig.OBJECT_CACHE_SIZE_KEY;
+import static org.apache.metron.enrichment.cache.ObjectCacheConfig.OBJECT_CACHE_TIME_UNIT_KEY;
+import static org.apache.metron.enrichment.stellar.EnrichmentObjectGet.ENRICHMENT_OBJECT_GET_SETTINGS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({EnrichmentObjectGet.class, ObjectCache.class})
+public class EnrichmentObjectGetTest {
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ private EnrichmentObjectGet enrichmentObjectGet;
+ private ObjectCache objectCache;
+ private Context context;
+
+ @Before
+ public void setup() throws Exception {
+ enrichmentObjectGet = new EnrichmentObjectGet();
+ objectCache = mock(ObjectCache.class);
+ context = new Context.Builder()
+ .with(Context.Capabilities.GLOBAL_CONFIG, HashMap::new)
+ .build();
+
+ whenNew(ObjectCache.class).withNoArguments().thenReturn(objectCache);
+ }
+
+ @Test
+ public void shouldInitializeWithDefaultSettings() throws Exception {
+ when(objectCache.isInitialized()).thenReturn(true);
+
+ enrichmentObjectGet.initialize(context);
+
+ ObjectCacheConfig expectedConfig = new ObjectCacheConfig(new HashMap<>());
+
+ verify(objectCache, times(1)).initialize(expectedConfig);
+ assertTrue(enrichmentObjectGet.isInitialized());
+ }
+
+ @Test
+ public void shouldInitializeWithCustomSettings() throws Exception {
+ Map<String, Object> globalConfig = new HashMap<String, Object>() {{
+ put(ENRICHMENT_OBJECT_GET_SETTINGS, new HashMap<String, Object>() {{
+ put(OBJECT_CACHE_SIZE_KEY, 1);
+ put(OBJECT_CACHE_EXPIRATION_KEY, 2);
+ put(OBJECT_CACHE_TIME_UNIT_KEY, "SECONDS");
+ put(OBJECT_CACHE_MAX_FILE_SIZE_KEY, 3);
+ }});
+ }};
+
+ when(objectCache.isInitialized()).thenReturn(true);
+ context = new Context.Builder()
+ .with(Context.Capabilities.GLOBAL_CONFIG, () -> globalConfig)
+ .build();
+
+ assertFalse(enrichmentObjectGet.isInitialized());
+
+ enrichmentObjectGet.initialize(context);
+
+ ObjectCacheConfig expectedConfig = new ObjectCacheConfig(new HashMap<>());
+ expectedConfig.setCacheSize(1);
+ expectedConfig.setCacheExpiration(2);
+ expectedConfig.setTimeUnit(TimeUnit.SECONDS);
+ expectedConfig.setMaxFileSize(3);
+
+ verify(objectCache, times(1)).initialize(expectedConfig);
+ assertTrue(enrichmentObjectGet.isInitialized());
+ }
+
+ @Test
+ public void shouldApplyEnrichmentObjectGet() {
+ Map<String, Object> enrichment = new HashMap<String, Object>() {{
+ put("key", "value");
+ }};
+ when(objectCache.get("/path")).thenReturn(enrichment);
+
+ assertNull(enrichmentObjectGet.apply(Arrays.asList("/path", "key"), context));
+
+ when(objectCache.isInitialized()).thenReturn(true);
+ enrichmentObjectGet.initialize(context);
+
+ assertNull(enrichmentObjectGet.apply(Arrays.asList(null, null), context));
+ assertEquals("value", enrichmentObjectGet.apply(Arrays.asList("/path", "key"), context));
+ }
+
+ @Test
+ public void shouldThrowExceptionOnIncorrectObjectFormat() {
+ thrown.expect(ClassCastException.class);
+ thrown.expectMessage("The object stored in HDFS at '/path' must be serialized in JSON format.");
+
+ when(objectCache.get("/path")).thenReturn("incorrect format");
+
+ when(objectCache.isInitialized()).thenReturn(true);
+ enrichmentObjectGet.initialize(context);
+ enrichmentObjectGet.apply(Arrays.asList("/path", "key"), context);
+ }
+
+ @Test
+ public void restGetShouldThrownExceptionOnMissingParameter() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("All parameters are mandatory, submit 'hdfs path', 'indicator'");
+
+ enrichmentObjectGet.apply(new ArrayList<>(), context);
+ }
+
+}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/ObjectGetIntegrationTest.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/ObjectGetIntegrationTest.java
new file mode 100644
index 0000000..d3f6f00
--- /dev/null
+++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/ObjectGetIntegrationTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.metron.enrichment.stellar;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.metron.common.utils.SerDeUtils;
+import org.apache.metron.integration.utils.TestUtils;
+import org.apache.metron.stellar.common.utils.StellarProcessorUtils;
+import org.apache.metron.stellar.dsl.ParseException;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+
+public class ObjectGetIntegrationTest {
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ private File file;
+
+ @Before
+ public void setup() throws Exception {
+ File tempDir = TestUtils.createTempDir(this.getClass().getName());
+ file = new File(tempDir, "object.ser");
+ try(BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(file))) {
+ IOUtils.write(SerDeUtils.toBytes("object get data"), bos);
+ }
+ }
+
+ @Test
+ public void shouldReturnEnrichment() {
+ String expression = String.format("OBJECT_GET('%s')", file.getAbsolutePath());
+ String value = (String) StellarProcessorUtils.run(expression, new HashMap<>());
+ assertEquals("object get data", value);
+ }
+
+ @Test
+ public void shouldThrowExceptionOnInvalidPath() {
+ thrown.expect(ParseException.class);
+ thrown.expectMessage("Unable to parse OBJECT_GET('/some/path'): Unable to parse: OBJECT_GET('/some/path') due to: Path '/some/path' could not be found in HDFS");
+
+ String expression = String.format("OBJECT_GET('%s')", "/some/path");
+ StellarProcessorUtils.run(expression, new HashMap<>());
+ }
+}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/ObjectGetTest.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/ObjectGetTest.java
index 400dfb8..d8e2255 100644
--- a/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/ObjectGetTest.java
+++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/ObjectGetTest.java
@@ -18,73 +18,88 @@
package org.apache.metron.enrichment.stellar;
-import com.google.common.collect.ImmutableMap;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.metron.common.utils.SerDeUtils;
-import org.apache.metron.stellar.common.utils.StellarProcessorUtils;
-import org.junit.Assert;
+import org.apache.metron.enrichment.cache.ObjectCache;
+import org.apache.metron.enrichment.cache.ObjectCacheConfig;
+import org.apache.metron.stellar.dsl.Context;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
-import java.io.BufferedOutputStream;
-import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ObjectGet.class, ObjectCache.class})
public class ObjectGetTest {
- FileSystem fs;
- List<String> data;
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ private ObjectGet objectGet;
+ private ObjectCache objectCache;
+ private Context context;
@Before
- public void setup() throws IOException {
- fs = FileSystem.get(new Configuration());
- data = new ArrayList<>();
- {
- data.add("apache");
- data.add("metron");
- data.add("is");
- data.add("great");
- }
+ public void setup() throws Exception {
+ objectGet = new ObjectGet();
+ objectCache = mock(ObjectCache.class);
+ context = new Context.Builder()
+ .with(Context.Capabilities.GLOBAL_CONFIG, HashMap::new)
+ .build();
+ whenNew(ObjectCache.class).withNoArguments().thenReturn(objectCache);
}
@Test
- public void test() throws Exception {
- String filename = "target/ogt/test.ser";
- Assert.assertTrue(ObjectGet.cache == null || !ObjectGet.cache.asMap().containsKey(filename));
- assertDataIsReadCorrectly(filename);
+ public void shouldInitialize() throws Exception {
+ when(objectCache.isInitialized()).thenReturn(true);
+
+ assertFalse(objectGet.isInitialized());
+ objectGet.initialize(context);
+
+ ObjectCacheConfig expectedConfig = new ObjectCacheConfig(new HashMap<>());
+
+ verify(objectCache, times(1)).initialize(expectedConfig);
+ assertTrue(objectGet.isInitialized());
}
- public void assertDataIsReadCorrectly(String filename) throws IOException {
- try(BufferedOutputStream bos = new BufferedOutputStream(fs.create(new Path(filename), true))) {
- IOUtils.write(SerDeUtils.toBytes(data), bos);
- }
- List<String> readData = (List<String>) StellarProcessorUtils.run("OBJECT_GET(loc)", ImmutableMap.of("loc", filename));
- Assert.assertEquals(readData, data);
- Assert.assertTrue(ObjectGet.cache.asMap().containsKey(filename));
+ @Test
+ public void shouldApplyObjectGet() {
+ Object object = mock(Object.class);
+ when(objectCache.get("/path")).thenReturn(object);
+
+ assertNull(objectGet.apply(Collections.singletonList("/path"), context));
+
+ when(objectCache.isInitialized()).thenReturn(true);
+ objectGet.initialize(context);
+
+ assertNull(objectGet.apply(new ArrayList<>(), context));
+ assertNull(objectGet.apply(Collections.singletonList(null), context));
+ assertEquals(object, objectGet.apply(Collections.singletonList("/path"), context));
}
@Test
- public void testMultithreaded() throws Exception {
- String filename = "target/ogt/testmulti.ser";
- Assert.assertTrue(ObjectGet.cache == null || !ObjectGet.cache.asMap().containsKey(filename));
- Thread[] ts = new Thread[10];
- for(int i = 0;i < ts.length;++i) {
- ts[i] = new Thread(() -> {
- try {
- assertDataIsReadCorrectly(filename);
- } catch (IOException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- });
- ts[i].start();
- }
- for(Thread t : ts) {
- t.join();
- }
+ public void shouldThrowIllegalStateExceptionOnInvalidPath() {
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("Unable to retrieve 1 as it is not a path");
+
+ when(objectCache.isInitialized()).thenReturn(true);
+ objectGet.initialize(context);
+ objectGet.apply(Collections.singletonList(1), context);
}
+
}