You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2019/06/17 18:33:00 UTC

[metron] branch master updated: METRON-2073 Create in-memory use case for enrichment with map type and flatfile summarizer (merrimanr) closes apache/metron#1399

This is an automated email from the ASF dual-hosted git repository.

rmerriman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git


The following commit(s) were added to refs/heads/master by this push:
     new 38b8a78  METRON-2073 Create in-memory use case for enrichment with map type and flatfile summarizer (merrimanr) closes apache/metron#1399
38b8a78 is described below

commit 38b8a7824a3ceed829215c8ce4ecad963ebf449e
Author: merrimanr <me...@gmail.com>
AuthorDate: Mon Jun 17 13:32:44 2019 -0500

    METRON-2073 Create in-memory use case for enrichment with map type and flatfile summarizer (merrimanr) closes apache/metron#1399
---
 .../metron/enrichment/cache/ObjectCache.java       | 123 +++++++++++++++++
 .../metron/enrichment/cache/ObjectCacheConfig.java | 115 ++++++++++++++++
 .../enrichment/stellar/EnrichmentObjectGet.java    | 101 ++++++++++++++
 .../metron/enrichment/stellar/ObjectGet.java       |  94 ++-----------
 .../ObjectCacheTest.java}                          |  63 ++++++---
 .../EnrichmentObjectGetIntegrationTest.java        |  72 ++++++++++
 .../stellar/EnrichmentObjectGetTest.java           | 152 +++++++++++++++++++++
 .../stellar/ObjectGetIntegrationTest.java          |  70 ++++++++++
 .../metron/enrichment/stellar/ObjectGetTest.java   | 115 +++++++++-------
 9 files changed, 757 insertions(+), 148 deletions(-)

diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/cache/ObjectCache.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/cache/ObjectCache.java
new file mode 100644
index 0000000..9d22bfc
--- /dev/null
+++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/cache/ObjectCache.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.enrichment.cache;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.common.utils.SerDeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class ObjectCache {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  protected LoadingCache<String, Object> cache;
+  private static ReadWriteLock lock = new ReentrantReadWriteLock();
+
+  public class Loader implements CacheLoader<String, Object> {
+    FileSystem fs;
+    ObjectCacheConfig objectCacheConfig;
+
+    public Loader(Configuration hadoopConfig, ObjectCacheConfig objectCacheConfig) throws IOException {
+      this.fs = FileSystem.get(hadoopConfig);
+      this.objectCacheConfig = objectCacheConfig;
+    }
+
+    @Override
+    public Object load(String s) throws Exception {
+      LOG.debug("Loading object from path '{}'", s);
+      if (StringUtils.isEmpty(s)) {
+        throw new IllegalArgumentException("Path cannot be empty");
+      }
+      Object object = null;
+      Path p = new Path(s);
+      if (fs.exists(p)) {
+        if (fs.getFileStatus(p).getLen() <= objectCacheConfig.getMaxFileSize()) {
+          try (InputStream is = new BufferedInputStream(fs.open(p))) {
+            byte[] serialized = IOUtils.toByteArray(is);
+            if (serialized.length > 0) {
+              object = SerDeUtils.fromBytes(serialized, Object.class);
+            }
+          }
+        } else {
+          throw new IllegalArgumentException(String.format("File at path '%s' is larger than the configured max file size of %s", p, objectCacheConfig.getMaxFileSize()));
+        }
+      } else {
+        throw new IllegalArgumentException(String.format("Path '%s' could not be found in HDFS", s));
+      }
+      return object;
+    }
+  }
+
+  public Object get(String path) {
+    return cache.get(path);
+  }
+
+  public void initialize(ObjectCacheConfig config) {
+    try {
+      lock.writeLock().lock();
+
+      cache = setupCache(config);
+    } catch (IOException e) {
+      throw new IllegalStateException("Unable to initialize: " + e.getMessage(), e);
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  public boolean isInitialized() {
+    try {
+      lock.readLock().lock();
+      return cache != null;
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  protected LoadingCache<String, Object> setupCache(ObjectCacheConfig config) throws IOException {
+    LOG.info("Building ObjectCache with {}", config);
+    return Caffeine.newBuilder().maximumSize(config.getCacheSize())
+            .expireAfterWrite(config.getCacheExpiration(), config.getTimeUnit())
+            .removalListener((path, value, removalCause) -> {
+              LOG.debug("Object retrieved from path '{}' was removed with cause {}", path, removalCause);
+            })
+            .build(new Loader(new Configuration(), config));
+  }
+
+  public boolean isEmpty() {
+      return cache == null || cache.estimatedSize() == 0;
+  }
+
+  public boolean containsKey(String key) {
+      return cache != null && cache.asMap().containsKey(key);
+  }
+}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/cache/ObjectCacheConfig.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/cache/ObjectCacheConfig.java
new file mode 100644
index 0000000..2c0c97e
--- /dev/null
+++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/cache/ObjectCacheConfig.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.enrichment.cache;
+
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+public class ObjectCacheConfig {
+
+  public static final String OBJECT_CACHE_SIZE_KEY = "object.cache.size";
+  public static final String OBJECT_CACHE_EXPIRATION_MINUTES_KEY = "object.cache.expiration.minutes";
+  public static final String OBJECT_CACHE_EXPIRATION_KEY = "object.cache.expiration";
+  public static final String OBJECT_CACHE_TIME_UNIT_KEY = "object.cache.time.unit";
+  public static final String OBJECT_CACHE_MAX_FILE_SIZE_KEY = "object.cache.max.file.size";
+  public static final long OBJECT_CACHE_SIZE_DEFAULT = 1000;
+  public static final long OBJECT_CACHE_EXPIRATION_MIN_DEFAULT = 1440;
+  public static final TimeUnit OBJECT_CACHE_TIME_UNIT_DEFAULT = TimeUnit.MINUTES;
+  public static final long OBJECT_CACHE_MAX_FILE_SIZE_DEFAULT = 1048576; // default to 1 mb
+
+  private long cacheSize;
+  private long cacheExpiration;
+  private TimeUnit timeUnit;
+  private long maxFileSize;
+
+  public ObjectCacheConfig(Map<String, Object> config) {
+      cacheSize = ConversionUtils.convert(config.getOrDefault(OBJECT_CACHE_SIZE_KEY, OBJECT_CACHE_SIZE_DEFAULT), Long.class);
+      if (config.containsKey(OBJECT_CACHE_EXPIRATION_MINUTES_KEY)) {
+          cacheExpiration = ConversionUtils.convert(config.getOrDefault(OBJECT_CACHE_EXPIRATION_MINUTES_KEY, OBJECT_CACHE_EXPIRATION_MIN_DEFAULT), Long.class);
+          timeUnit = OBJECT_CACHE_TIME_UNIT_DEFAULT;
+      } else {
+          cacheExpiration = ConversionUtils.convert(config.getOrDefault(OBJECT_CACHE_EXPIRATION_KEY, OBJECT_CACHE_EXPIRATION_MIN_DEFAULT), Long.class);
+          timeUnit = config.containsKey(OBJECT_CACHE_TIME_UNIT_KEY) ?
+                  TimeUnit.valueOf((String) config.get(OBJECT_CACHE_TIME_UNIT_KEY)) : OBJECT_CACHE_TIME_UNIT_DEFAULT;
+      }
+      maxFileSize = ConversionUtils.convert(config.getOrDefault(OBJECT_CACHE_MAX_FILE_SIZE_KEY, OBJECT_CACHE_MAX_FILE_SIZE_DEFAULT), Long.class);
+  }
+
+  public long getCacheSize() {
+    return cacheSize;
+  }
+
+  public void setCacheSize(long cacheSize) {
+    this.cacheSize = cacheSize;
+  }
+
+  public long getCacheExpiration() {
+    return cacheExpiration;
+  }
+
+  public void setCacheExpiration(long cacheExpiration) {
+    this.cacheExpiration = cacheExpiration;
+  }
+
+  public TimeUnit getTimeUnit() {
+    return timeUnit;
+  }
+
+  public void setTimeUnit(TimeUnit timeUnit) {
+    this.timeUnit = timeUnit;
+  }
+
+  public long getMaxFileSize() {
+    return maxFileSize;
+  }
+
+  public void setMaxFileSize(long maxFileSize) {
+    this.maxFileSize = maxFileSize;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    ObjectCacheConfig that = (ObjectCacheConfig) o;
+    return cacheSize == that.cacheSize &&
+            cacheExpiration == that.cacheExpiration &&
+            timeUnit == that.timeUnit &&
+            maxFileSize == that.maxFileSize;
+  }
+
+  @Override
+  public int hashCode() {
+
+    return Objects.hash(cacheSize, cacheExpiration, timeUnit, maxFileSize);
+  }
+
+  @Override
+  public String toString() {
+    return "ObjectCacheConfig{" +
+            "cacheSize=" + cacheSize +
+            ", cacheExpiration=" + cacheExpiration +
+            ", timeUnit=" + timeUnit +
+            ", maxFileSize=" + maxFileSize +
+            '}';
+  }
+}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/stellar/EnrichmentObjectGet.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/stellar/EnrichmentObjectGet.java
new file mode 100644
index 0000000..11ebe88
--- /dev/null
+++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/stellar/EnrichmentObjectGet.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.enrichment.stellar;
+
+import org.apache.metron.enrichment.cache.ObjectCache;
+import org.apache.metron.enrichment.cache.ObjectCacheConfig;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.ParseException;
+import org.apache.metron.stellar.dsl.Stellar;
+import org.apache.metron.stellar.dsl.StellarFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.metron.enrichment.stellar.EnrichmentObjectGet.ENRICHMENT_OBJECT_GET_SETTINGS;
+
+@Stellar(namespace="ENRICHMENT"
+        ,name="OBJECT_GET"
+        ,description="Retrieve and deserialize a serialized object from HDFS and stores it in the ObjectCache,  " +
+        "then returns the value associated with the indicator." +
+        "The cache can be specified via three properties in the global config: " +
+        "\"" + ObjectCacheConfig.OBJECT_CACHE_SIZE_KEY + "\" (default " + ObjectCacheConfig.OBJECT_CACHE_SIZE_DEFAULT + ")," +
+        "\"" + ObjectCacheConfig.OBJECT_CACHE_EXPIRATION_KEY + "\" (default " + ObjectCacheConfig.OBJECT_CACHE_EXPIRATION_MIN_DEFAULT + ")," +
+        "\"" + ObjectCacheConfig.OBJECT_CACHE_TIME_UNIT_KEY+ "\" (default MINUTES)." +
+        "Cache settings that apply only to this function can also be specified in the global config by nesting the settings above under the " + ENRICHMENT_OBJECT_GET_SETTINGS + " key." +
+        "Note, if these are changed in global config, topology restart is required."
+        , params = {
+            "path - The path in HDFS to the serialized object" +
+            "indicator - The string indicator to look up"
+          }
+        , returns="Value associated with the indicator."
+)
+public class EnrichmentObjectGet implements StellarFunction {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  public final static String ENRICHMENT_OBJECT_GET_SETTINGS = "enrichment.object.get.settings";
+
+  private ObjectCache objectCache;
+
+  @Override
+  public Object apply(List<Object> args, Context context) throws ParseException {
+    if(args.size() != 2) {
+      throw new IllegalArgumentException("All parameters are mandatory, submit 'hdfs path', 'indicator'");
+    }
+    if(!isInitialized()) {
+      return null;
+    }
+
+    String path = (String) args.get(0);
+    String indicator = (String) args.get(1);
+    if(path == null || indicator == null) {
+      return null;
+    }
+
+    Object value;
+    try {
+      Map cachedMap = (Map) objectCache.get(path);
+      LOG.debug("Looking up value from object at path '{}' using indicator {}", path, indicator);
+      value = cachedMap.get(indicator);
+    } catch(ClassCastException e) {
+      throw new ClassCastException(String.format("The object stored in HDFS at '%s' must be serialized in JSON format.", path));
+    }
+
+    return value;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void initialize(Context context) {
+    Map<String, Object> config = (Map<String, Object>) context.getCapability(Context.Capabilities.GLOBAL_CONFIG, false)
+            .orElse(new HashMap<>());
+    Map<String, Object> enrichmentGetConfig = (Map<String, Object>) config.getOrDefault(ENRICHMENT_OBJECT_GET_SETTINGS, new HashMap<>());
+    ObjectCacheConfig objectCacheConfig = new ObjectCacheConfig(enrichmentGetConfig);
+    objectCache = new ObjectCache();
+    objectCache.initialize(objectCacheConfig);
+  }
+
+  @Override
+  public boolean isInitialized() {
+    return objectCache != null && objectCache.isInitialized();
+  }
+}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/stellar/ObjectGet.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/stellar/ObjectGet.java
index ebb94da..dddef5f 100644
--- a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/stellar/ObjectGet.java
+++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/stellar/ObjectGet.java
@@ -18,16 +18,8 @@
 
 package org.apache.metron.enrichment.stellar;
 
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.metron.common.utils.SerDeUtils;
-import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.apache.metron.enrichment.cache.ObjectCache;
+import org.apache.metron.enrichment.cache.ObjectCacheConfig;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.ParseException;
 import org.apache.metron.stellar.dsl.Stellar;
@@ -35,26 +27,19 @@ import org.apache.metron.stellar.dsl.StellarFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
 import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 @Stellar(namespace="OBJECT"
         ,name="GET"
         ,description="Retrieve and deserialize a serialized object from HDFS.  " +
-        "The cache can be specified via two properties in the global config: " +
-        "\"" + ObjectGet.OBJECT_CACHE_SIZE_KEY + "\" (default " + ObjectGet.OBJECT_CACHE_SIZE_DEFAULT + ")," +
-        "\"" + ObjectGet.OBJECT_CACHE_EXPIRATION_KEY+ "\" (default 1440).  Note, if these are changed in global config, " +
-        "topology restart is required."
+        "The cache can be specified via three properties in the global config: " +
+        "\"" + ObjectCacheConfig.OBJECT_CACHE_SIZE_KEY + "\" (default " + ObjectCacheConfig.OBJECT_CACHE_SIZE_DEFAULT + ")," +
+        "\"" + ObjectCacheConfig.OBJECT_CACHE_EXPIRATION_KEY + "\" (default " + ObjectCacheConfig.OBJECT_CACHE_EXPIRATION_MIN_DEFAULT + ")," +
+        "\"" + ObjectCacheConfig.OBJECT_CACHE_TIME_UNIT_KEY+ "\" (default MINUTES)." +
+        "Note, if these are changed in global config, topology restart is required."
         , params = {
             "path - The path in HDFS to the serialized object"
           }
@@ -62,36 +47,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 )
 public class ObjectGet implements StellarFunction {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  public static final String OBJECT_CACHE_SIZE_KEY = "object.cache.size";
-  public static final String OBJECT_CACHE_EXPIRATION_KEY = "object.cache.expiration.minutes";
-  public static final int OBJECT_CACHE_SIZE_DEFAULT = 1000;
-  public static final long OBJECT_CACHE_EXPIRATION_MIN_DEFAULT = TimeUnit.HOURS.toMinutes(24);
-  protected static LoadingCache<String, Object> cache;
-  private static ReadWriteLock lock = new ReentrantReadWriteLock();
 
-  public static class Loader extends CacheLoader<String, Object> {
-    FileSystem fs;
-    public Loader(Configuration hadoopConfig) throws IOException {
-      this.fs = FileSystem.get(hadoopConfig);
-    }
-    @Override
-    public Object load(String s) throws Exception {
-      if(StringUtils.isEmpty(s)) {
-        return null;
-      }
-      Path p = new Path(s);
-      if(fs.exists(p)) {
-        try(InputStream is = new BufferedInputStream(fs.open(p))) {
-          byte[] serialized = IOUtils.toByteArray(is);
-          if(serialized.length > 0) {
-            Object ret = SerDeUtils.fromBytes(serialized, Object.class);
-            return ret;
-          }
-        }
-      }
-      return null;
-    }
-  }
+  private ObjectCache objectCache;
 
   @Override
   public Object apply(List<Object> args, Context context) throws ParseException {
@@ -106,11 +63,7 @@ public class ObjectGet implements StellarFunction {
       return null;
     }
     if(o instanceof String) {
-      try {
-        return cache.get((String)o);
-      } catch (ExecutionException e) {
-        throw new IllegalStateException("Unable to retrieve " + o + " because " + e.getMessage(), e);
-      }
+      return objectCache.get((String) o);
     }
     else {
       throw new IllegalStateException("Unable to retrieve " + o + " as it is not a path");
@@ -119,35 +72,14 @@ public class ObjectGet implements StellarFunction {
 
   @Override
   public void initialize(Context context) {
-    try {
-      lock.writeLock().lock();
-      Map<String, Object> config = getConfig(context);
-      long size = ConversionUtils.convert(config.getOrDefault(OBJECT_CACHE_SIZE_KEY, OBJECT_CACHE_SIZE_DEFAULT), Long.class);
-      long expiryMin = ConversionUtils.convert(config.getOrDefault(OBJECT_CACHE_EXPIRATION_KEY, OBJECT_CACHE_EXPIRATION_MIN_DEFAULT), Long.class);
-      cache = setupCache(size, expiryMin);
-    } catch (IOException e) {
-      throw new IllegalStateException("Unable to initialize: " + e.getMessage(), e);
-    } finally {
-      lock.writeLock().unlock();
-    }
+    Map<String, Object> config = getConfig(context);
+    objectCache = new ObjectCache();
+    objectCache.initialize(new ObjectCacheConfig(config));
   }
 
   @Override
   public boolean isInitialized() {
-    try {
-      lock.readLock().lock();
-      return cache != null;
-    }
-    finally {
-      lock.readLock().unlock();
-    }
-  }
-
-  protected LoadingCache<String, Object> setupCache(long size, long expiryMin) throws IOException {
-    return CacheBuilder.newBuilder()
-                       .maximumSize(size)
-                       .expireAfterAccess(expiryMin, TimeUnit.MINUTES)
-                       .build(new Loader(new Configuration()));
+    return objectCache != null && objectCache.isInitialized();
   }
 
   protected Map<String, Object> getConfig(Context context) {
diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/ObjectGetTest.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/cache/ObjectCacheTest.java
similarity index 54%
copy from metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/ObjectGetTest.java
copy to metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/cache/ObjectCacheTest.java
index 400dfb8..4ad6494 100644
--- a/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/ObjectGetTest.java
+++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/cache/ObjectCacheTest.java
@@ -16,28 +16,35 @@
  * limitations under the License.
  */
 
-package org.apache.metron.enrichment.stellar;
+package org.apache.metron.enrichment.cache;
 
-import com.google.common.collect.ImmutableMap;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.metron.common.utils.SerDeUtils;
-import org.apache.metron.stellar.common.utils.StellarProcessorUtils;
+import org.apache.metron.integration.utils.TestUtils;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
-import java.io.BufferedOutputStream;
-import java.io.IOException;
+import java.io.*;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
-public class ObjectGetTest {
-  FileSystem fs;
-  List<String> data;
+public class ObjectCacheTest {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private FileSystem fs;
+  private List<String> data;
+  private ObjectCache cache;
+  private File tempDir;
 
   @Before
   public void setup() throws IOException {
@@ -49,35 +56,38 @@ public class ObjectGetTest {
       data.add("is");
       data.add("great");
     }
-
+    cache = new ObjectCache();
+    tempDir = TestUtils.createTempDir(this.getClass().getName());
   }
 
   @Test
   public void test() throws Exception {
-    String filename = "target/ogt/test.ser";
-    Assert.assertTrue(ObjectGet.cache == null || !ObjectGet.cache.asMap().containsKey(filename));
+    String filename = "test.ser";
+    Assert.assertTrue(cache.isEmpty() || !cache.containsKey(filename));
     assertDataIsReadCorrectly(filename);
   }
 
   public void assertDataIsReadCorrectly(String filename) throws IOException {
-    try(BufferedOutputStream bos = new BufferedOutputStream(fs.create(new Path(filename), true))) {
+    File file = new File(tempDir, filename);
+    try(BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(file))) {
       IOUtils.write(SerDeUtils.toBytes(data), bos);
     }
-    List<String> readData = (List<String>) StellarProcessorUtils.run("OBJECT_GET(loc)", ImmutableMap.of("loc", filename));
+    cache.initialize(new ObjectCacheConfig(new HashMap<>()));
+    List<String> readData = (List<String>) cache.get(file.getAbsolutePath());
     Assert.assertEquals(readData, data);
-    Assert.assertTrue(ObjectGet.cache.asMap().containsKey(filename));
+    Assert.assertTrue(cache.containsKey(file.getAbsolutePath()));
   }
 
   @Test
   public void testMultithreaded() throws Exception {
-    String filename = "target/ogt/testmulti.ser";
-    Assert.assertTrue(ObjectGet.cache == null || !ObjectGet.cache.asMap().containsKey(filename));
+    String filename = "testmulti.ser";
+    Assert.assertTrue(cache.isEmpty() || !cache.containsKey(filename));
     Thread[] ts = new Thread[10];
     for(int i = 0;i < ts.length;++i) {
       ts[i] = new Thread(() -> {
         try {
           assertDataIsReadCorrectly(filename);
-        } catch (IOException e) {
+        } catch (Exception e) {
           throw new IllegalStateException(e.getMessage(), e);
         }
       });
@@ -87,4 +97,23 @@ public class ObjectGetTest {
       t.join();
     }
   }
+
+  @Test
+  public void shouldThrowExceptionOnMaxFileSize() throws Exception {
+    String filename = "maxSizeException.ser";
+    File file = new File(tempDir, filename);
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(String.format("File at path '%s' is larger than the configured max file size of 1", file.getAbsolutePath()));
+
+
+    try(BufferedOutputStream bos = new BufferedOutputStream(fs.create(new Path(file.getAbsolutePath()), true))) {
+      IOUtils.write(SerDeUtils.toBytes(data), bos);
+    }
+    ObjectCacheConfig objectCacheConfig = new ObjectCacheConfig(new HashMap<>());
+    objectCacheConfig.setMaxFileSize(1);
+    cache.initialize(objectCacheConfig);
+
+    cache.get(file.getAbsolutePath());
+  }
 }
diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/EnrichmentObjectGetIntegrationTest.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/EnrichmentObjectGetIntegrationTest.java
new file mode 100644
index 0000000..b9ebb3f
--- /dev/null
+++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/EnrichmentObjectGetIntegrationTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.metron.enrichment.stellar;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.metron.common.utils.SerDeUtils;
+import org.apache.metron.integration.utils.TestUtils;
+import org.apache.metron.stellar.common.utils.StellarProcessorUtils;
+import org.apache.metron.stellar.dsl.ParseException;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+
+public class EnrichmentObjectGetIntegrationTest {
+
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    private File file;
+
+    @Before
+    public void setup() throws Exception {
+        File tempDir = TestUtils.createTempDir(this.getClass().getName());
+        file = new File(tempDir, "enrichment.ser");
+        try(BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(file))) {
+            IOUtils.write(SerDeUtils.toBytes(new HashMap<String, Object>() {{
+                put("key", "value");
+            }}), bos);
+        }
+    }
+
+    @Test
+    public void shouldReturnEnrichment() {
+        String expression = String.format("ENRICHMENT_OBJECT_GET('%s', '%s')", file.getAbsolutePath(), "key");
+        String value = (String) StellarProcessorUtils.run(expression, new HashMap<>());
+        assertEquals("value", value);
+    }
+
+    @Test
+    public void shouldThrowExceptionOnInvalidPath() {
+        thrown.expect(ParseException.class);
+        thrown.expectMessage("Unable to parse ENRICHMENT_OBJECT_GET('/some/path', 'key'): Unable to parse: ENRICHMENT_OBJECT_GET('/some/path', 'key') due to: Path '/some/path' could not be found in HDFS");
+
+        String expression = String.format("ENRICHMENT_OBJECT_GET('%s', '%s')", "/some/path", "key");
+        StellarProcessorUtils.run(expression, new HashMap<>());
+    }
+}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/EnrichmentObjectGetTest.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/EnrichmentObjectGetTest.java
new file mode 100644
index 0000000..b12e666
--- /dev/null
+++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/EnrichmentObjectGetTest.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.enrichment.stellar;
+
+import org.apache.metron.enrichment.cache.ObjectCache;
+import org.apache.metron.enrichment.cache.ObjectCacheConfig;
+import org.apache.metron.stellar.dsl.Context;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.metron.enrichment.cache.ObjectCacheConfig.OBJECT_CACHE_EXPIRATION_KEY;
+import static org.apache.metron.enrichment.cache.ObjectCacheConfig.OBJECT_CACHE_MAX_FILE_SIZE_KEY;
+import static org.apache.metron.enrichment.cache.ObjectCacheConfig.OBJECT_CACHE_SIZE_KEY;
+import static org.apache.metron.enrichment.cache.ObjectCacheConfig.OBJECT_CACHE_TIME_UNIT_KEY;
+import static org.apache.metron.enrichment.stellar.EnrichmentObjectGet.ENRICHMENT_OBJECT_GET_SETTINGS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({EnrichmentObjectGet.class, ObjectCache.class})
+public class EnrichmentObjectGetTest {
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private EnrichmentObjectGet enrichmentObjectGet;
+  private ObjectCache objectCache;
+  private Context context;
+
+  @Before
+  public void setup() throws Exception {
+    enrichmentObjectGet = new EnrichmentObjectGet();
+    objectCache = mock(ObjectCache.class);
+    context = new Context.Builder()
+            .with(Context.Capabilities.GLOBAL_CONFIG, HashMap::new)
+            .build();
+
+    whenNew(ObjectCache.class).withNoArguments().thenReturn(objectCache);
+  }
+
+  @Test
+  public void shouldInitializeWithDefaultSettings() throws Exception {
+    when(objectCache.isInitialized()).thenReturn(true);
+
+    enrichmentObjectGet.initialize(context);
+
+    ObjectCacheConfig expectedConfig = new ObjectCacheConfig(new HashMap<>());
+
+    verify(objectCache, times(1)).initialize(expectedConfig);
+    assertTrue(enrichmentObjectGet.isInitialized());
+  }
+
+  @Test
+  public void shouldInitializeWithCustomSettings() throws Exception {
+    Map<String, Object> globalConfig = new HashMap<String, Object>() {{
+      put(ENRICHMENT_OBJECT_GET_SETTINGS, new HashMap<String, Object>() {{
+        put(OBJECT_CACHE_SIZE_KEY, 1);
+        put(OBJECT_CACHE_EXPIRATION_KEY, 2);
+        put(OBJECT_CACHE_TIME_UNIT_KEY, "SECONDS");
+        put(OBJECT_CACHE_MAX_FILE_SIZE_KEY, 3);
+      }});
+    }};
+
+    when(objectCache.isInitialized()).thenReturn(true);
+    context = new Context.Builder()
+            .with(Context.Capabilities.GLOBAL_CONFIG, () -> globalConfig)
+            .build();
+
+    assertFalse(enrichmentObjectGet.isInitialized());
+
+    enrichmentObjectGet.initialize(context);
+
+    ObjectCacheConfig expectedConfig = new ObjectCacheConfig(new HashMap<>());
+    expectedConfig.setCacheSize(1);
+    expectedConfig.setCacheExpiration(2);
+    expectedConfig.setTimeUnit(TimeUnit.SECONDS);
+    expectedConfig.setMaxFileSize(3);
+
+    verify(objectCache, times(1)).initialize(expectedConfig);
+    assertTrue(enrichmentObjectGet.isInitialized());
+  }
+
+  @Test
+  public void shouldApplyEnrichmentObjectGet() {
+    Map<String, Object> enrichment = new HashMap<String, Object>() {{
+      put("key", "value");
+    }};
+    when(objectCache.get("/path")).thenReturn(enrichment);
+
+    assertNull(enrichmentObjectGet.apply(Arrays.asList("/path", "key"), context));
+
+    when(objectCache.isInitialized()).thenReturn(true);
+    enrichmentObjectGet.initialize(context);
+
+    assertNull(enrichmentObjectGet.apply(Arrays.asList(null, null), context));
+    assertEquals("value", enrichmentObjectGet.apply(Arrays.asList("/path", "key"), context));
+  }
+
+  @Test
+  public void shouldThrowExceptionOnIncorrectObjectFormat() {
+    thrown.expect(ClassCastException.class);
+    thrown.expectMessage("The object stored in HDFS at '/path' must be serialized in JSON format.");
+
+    when(objectCache.get("/path")).thenReturn("incorrect format");
+
+    when(objectCache.isInitialized()).thenReturn(true);
+    enrichmentObjectGet.initialize(context);
+    enrichmentObjectGet.apply(Arrays.asList("/path", "key"), context);
+  }
+
+  @Test
+  public void restGetShouldThrownExceptionOnMissingParameter() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("All parameters are mandatory, submit 'hdfs path', 'indicator'");
+
+    enrichmentObjectGet.apply(new ArrayList<>(), context);
+  }
+
+}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/ObjectGetIntegrationTest.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/ObjectGetIntegrationTest.java
new file mode 100644
index 0000000..d3f6f00
--- /dev/null
+++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/ObjectGetIntegrationTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.metron.enrichment.stellar;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.metron.common.utils.SerDeUtils;
+import org.apache.metron.integration.utils.TestUtils;
+import org.apache.metron.stellar.common.utils.StellarProcessorUtils;
+import org.apache.metron.stellar.dsl.ParseException;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+
+public class ObjectGetIntegrationTest {
+
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    private File file;
+
+    @Before
+    public void setup() throws Exception {
+        File tempDir = TestUtils.createTempDir(this.getClass().getName());
+        file = new File(tempDir, "object.ser");
+        try(BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(file))) {
+            IOUtils.write(SerDeUtils.toBytes("object get data"), bos);
+        }
+    }
+
+    @Test
+    public void shouldReturnEnrichment() {
+        String expression = String.format("OBJECT_GET('%s')", file.getAbsolutePath());
+        String value = (String) StellarProcessorUtils.run(expression, new HashMap<>());
+        assertEquals("object get data", value);
+    }
+
+    @Test
+    public void shouldThrowExceptionOnInvalidPath() {
+        thrown.expect(ParseException.class);
+        thrown.expectMessage("Unable to parse OBJECT_GET('/some/path'): Unable to parse: OBJECT_GET('/some/path') due to: Path '/some/path' could not be found in HDFS");
+
+        String expression = String.format("OBJECT_GET('%s')", "/some/path");
+        StellarProcessorUtils.run(expression, new HashMap<>());
+    }
+}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/ObjectGetTest.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/ObjectGetTest.java
index 400dfb8..d8e2255 100644
--- a/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/ObjectGetTest.java
+++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/ObjectGetTest.java
@@ -18,73 +18,88 @@
 
 package org.apache.metron.enrichment.stellar;
 
-import com.google.common.collect.ImmutableMap;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.metron.common.utils.SerDeUtils;
-import org.apache.metron.stellar.common.utils.StellarProcessorUtils;
-import org.junit.Assert;
+import org.apache.metron.enrichment.cache.ObjectCache;
+import org.apache.metron.enrichment.cache.ObjectCacheConfig;
+import org.apache.metron.stellar.dsl.Context;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.io.BufferedOutputStream;
-import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ObjectGet.class, ObjectCache.class})
 public class ObjectGetTest {
-  FileSystem fs;
-  List<String> data;
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private ObjectGet objectGet;
+  private ObjectCache objectCache;
+  private Context context;
 
   @Before
-  public void setup() throws IOException {
-    fs = FileSystem.get(new Configuration());
-    data = new ArrayList<>();
-    {
-      data.add("apache");
-      data.add("metron");
-      data.add("is");
-      data.add("great");
-    }
+  public void setup() throws Exception {
+    objectGet = new ObjectGet();
+    objectCache = mock(ObjectCache.class);
+    context = new Context.Builder()
+            .with(Context.Capabilities.GLOBAL_CONFIG, HashMap::new)
+            .build();
 
+    whenNew(ObjectCache.class).withNoArguments().thenReturn(objectCache);
   }
 
   @Test
-  public void test() throws Exception {
-    String filename = "target/ogt/test.ser";
-    Assert.assertTrue(ObjectGet.cache == null || !ObjectGet.cache.asMap().containsKey(filename));
-    assertDataIsReadCorrectly(filename);
+  public void shouldInitialize() throws Exception {
+    when(objectCache.isInitialized()).thenReturn(true);
+
+    assertFalse(objectGet.isInitialized());
+    objectGet.initialize(context);
+
+    ObjectCacheConfig expectedConfig = new ObjectCacheConfig(new HashMap<>());
+
+    verify(objectCache, times(1)).initialize(expectedConfig);
+    assertTrue(objectGet.isInitialized());
   }
 
-  public void assertDataIsReadCorrectly(String filename) throws IOException {
-    try(BufferedOutputStream bos = new BufferedOutputStream(fs.create(new Path(filename), true))) {
-      IOUtils.write(SerDeUtils.toBytes(data), bos);
-    }
-    List<String> readData = (List<String>) StellarProcessorUtils.run("OBJECT_GET(loc)", ImmutableMap.of("loc", filename));
-    Assert.assertEquals(readData, data);
-    Assert.assertTrue(ObjectGet.cache.asMap().containsKey(filename));
+  @Test
+  public void shouldApplyObjectGet() {
+    Object object = mock(Object.class);
+    when(objectCache.get("/path")).thenReturn(object);
+
+    assertNull(objectGet.apply(Collections.singletonList("/path"), context));
+
+    when(objectCache.isInitialized()).thenReturn(true);
+    objectGet.initialize(context);
+
+    assertNull(objectGet.apply(new ArrayList<>(), context));
+    assertNull(objectGet.apply(Collections.singletonList(null), context));
+    assertEquals(object, objectGet.apply(Collections.singletonList("/path"), context));
   }
 
   @Test
-  public void testMultithreaded() throws Exception {
-    String filename = "target/ogt/testmulti.ser";
-    Assert.assertTrue(ObjectGet.cache == null || !ObjectGet.cache.asMap().containsKey(filename));
-    Thread[] ts = new Thread[10];
-    for(int i = 0;i < ts.length;++i) {
-      ts[i] = new Thread(() -> {
-        try {
-          assertDataIsReadCorrectly(filename);
-        } catch (IOException e) {
-          throw new IllegalStateException(e.getMessage(), e);
-        }
-      });
-      ts[i].start();
-    }
-    for(Thread t : ts) {
-      t.join();
-    }
+  public void shouldThrowIllegalStateExceptionOnInvalidPath() {
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("Unable to retrieve 1 as it is not a path");
+
+    when(objectCache.isInitialized()).thenReturn(true);
+    objectGet.initialize(context);
+    objectGet.apply(Collections.singletonList(1), context);
   }
+
 }