You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2017/05/18 20:41:01 UTC

apex-malhar git commit: APEXMALHAR-2473 Support for global cache meta information in db CacheManager

Repository: apex-malhar
Updated Branches:
  refs/heads/master c3f86f237 -> 118a75f5e


APEXMALHAR-2473 Support for global cache meta information in db CacheManager

  1. Uses Component interface and newly implemented CacheContext to pass properties to the Stores by calling setup(CacheContext).
  2. APEXMALHAR-2474: FSLoader implements Component to get numInitLinesToCache from CacheManager and use it in initial load.
	Add implementation of get() function so data will also be loaded after initial load.
  3. APEXMALHAR-2475: CacheStore implements Component for passing readOnly and numInitLinesToCache.
	Added NO_EVICTION expire strategy. This strategy will be set in setup(CacheContext) if readOnly is true.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/118a75f5
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/118a75f5
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/118a75f5

Branch: refs/heads/master
Commit: 118a75f5e78fb7d125f84f996f0bb3c157923d25
Parents: c3f86f2
Author: Oliver Winke <ol...@datatorrent.com>
Authored: Mon Apr 17 11:27:01 2017 -0700
Committer: Oliver Winke <ol...@datatorrent.com>
Committed: Thu May 18 11:00:55 2017 -0700

----------------------------------------------------------------------
 .../contrib/enrich/AbstractEnricher.java        |  21 +++-
 .../datatorrent/contrib/enrich/FSLoader.java    |  92 ++++++++++----
 .../datatorrent/lib/db/cache/CacheManager.java  | 119 ++++++++++++++++++-
 .../datatorrent/lib/db/cache/CacheStore.java    |  56 ++++++++-
 .../lib/db/cache/CacheManagerTest.java          |  56 +++++++++
 5 files changed, 312 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/118a75f5/contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java
index f2f8421..de9e2c7 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java
@@ -73,10 +73,11 @@ public abstract class AbstractEnricher<INPUT, OUTPUT> extends BaseOperator imple
   /**
    * Helper variables.
    */
-  private transient CacheManager cacheManager;
   protected transient List<FieldInfo> lookupFieldInfo = new ArrayList<>();
   protected transient List<FieldInfo> includeFieldInfo = new ArrayList<>();
 
+  private CacheManager cacheManager = new NullValuesCacheManager();
+
   /**
    * This method needs to be called by implementing class for processing a tuple for enrichment.
    * The method will take the tuple through following stages:
@@ -154,7 +155,6 @@ public abstract class AbstractEnricher<INPUT, OUTPUT> extends BaseOperator imple
   {
     super.setup(context);
 
-    cacheManager = new NullValuesCacheManager();
     CacheStore primaryCache = new CacheStore();
 
     // set expiration to one day.
@@ -185,13 +185,18 @@ public abstract class AbstractEnricher<INPUT, OUTPUT> extends BaseOperator imple
     try {
       cacheManager.initialize();
     } catch (IOException e) {
-      throw new RuntimeException("Unable to initialize primary cache", e);
+      throw new RuntimeException("Unable to initialize cache manager", e);
     }
   }
 
   @Override
   public void deactivate()
   {
+    try {
+      cacheManager.close();
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to close cache manager", e);
+    }
   }
 
   /**
@@ -320,4 +325,14 @@ public abstract class AbstractEnricher<INPUT, OUTPUT> extends BaseOperator imple
   {
     this.cacheSize = cacheSize;
   }
+
+  public CacheManager getCacheManager()
+  {
+    return cacheManager;
+  }
+
+  public void setCacheManager(CacheManager cacheManager)
+  {
+    this.cacheManager = cacheManager;
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/118a75f5/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java
index 464fa99..e04d6c4 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -37,6 +36,8 @@ import org.apache.hadoop.fs.Path;
 
 import com.esotericsoftware.kryo.NotNull;
 import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Component;
 import com.datatorrent.lib.db.cache.CacheManager;
 import com.datatorrent.lib.util.FieldInfo;
 
@@ -51,7 +52,7 @@ import com.datatorrent.lib.util.FieldInfo;
  * @since 3.4.0
  */
 @InterfaceStability.Evolving
-public abstract class FSLoader extends ReadOnlyBackup
+public abstract class FSLoader extends ReadOnlyBackup implements Component<CacheManager.CacheContext>
 {
   @NotNull
   private String fileName;
@@ -59,6 +60,7 @@ public abstract class FSLoader extends ReadOnlyBackup
   private transient Path filePath;
   private transient FileSystem fs;
   private transient boolean connected;
+  private transient int numInitCachedLines = -1;
 
   private static final Logger logger = LoggerFactory.getLogger(FSLoader.class);
 
@@ -75,40 +77,29 @@ public abstract class FSLoader extends ReadOnlyBackup
   @Override
   public Map<Object, Object> loadInitialData()
   {
-    Map<Object, Object> result = null;
-    FSDataInputStream in = null;
-    BufferedReader bin = null;
-    try {
+    Map<Object, Object> result;
+    try (
+      FSDataInputStream in = fs.open(filePath);
+      BufferedReader bin = new BufferedReader(new InputStreamReader(in));
+    ) {
       result = Maps.newHashMap();
-      in = fs.open(filePath);
-      bin = new BufferedReader(new InputStreamReader(in));
       String line;
-      while ((line = bin.readLine()) != null) {
+      int linesCount = 0;
+      while ((line = bin.readLine()) != null && (numInitCachedLines < 0 || linesCount < numInitCachedLines)) {
         try {
           Map<String, Object> tuple = extractFields(line);
           if (tuple != null && !tuple.isEmpty()) {
             result.put(getKey(tuple), getValue(tuple));
           }
         } catch (Exception parseExp) {
-          logger.info("Unable to parse line {}", line);
+          throw new RuntimeException(parseExp);
         }
+        ++linesCount;
       }
     } catch (IOException e) {
       throw new RuntimeException(e);
-    } finally {
-      if (bin != null) {
-        IOUtils.closeQuietly(bin);
-      }
-      if (in != null) {
-        IOUtils.closeQuietly(in);
-      }
-      try {
-        fs.close();
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
     }
-    logger.debug("loading initial data {}", result.size());
+    logger.debug("loading initial data, size: {}", result.size());
     return result;
   }
 
@@ -145,9 +136,44 @@ public abstract class FSLoader extends ReadOnlyBackup
   @Override
   public Object get(Object key)
   {
+    try (
+      FSDataInputStream in = openWithRetryFsInputStream(filePath);
+      BufferedReader bin = new BufferedReader(new InputStreamReader(in));
+      ) {
+      String line;
+      while ((line = bin.readLine()) != null) {
+        try {
+          Map<String, Object> tuple = extractFields(line);
+          if (tuple != null && !tuple.isEmpty() && getKey(tuple).equals(key)) {
+            logger.debug("Found line in FS {}", getValue(tuple));
+            return getValue(tuple);
+          }
+        } catch (Exception parseExp) {
+          throw new RuntimeException(parseExp);
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
     return null;
   }
 
+  private FSDataInputStream openWithRetryFsInputStream(Path filePath)
+  {
+    FSDataInputStream in;
+    try {
+      in = fs.open(filePath);
+    } catch (IOException e) {
+      try {
+        this.connect();
+        in = fs.open(filePath);
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+    return in;
+  }
+
   @Override
   public List<Object> getAll(List<Object> keys)
   {
@@ -179,4 +205,24 @@ public abstract class FSLoader extends ReadOnlyBackup
   {
     return connected;
   }
+
+  /**
+   * Callback to give the component a chance to perform tasks required as part of setting itself up.
+   * This callback is made exactly once during the operator lifetime.
+   *
+   * @param context - CacheContext with AttributeMap passed by CacheManager
+   */
+  @Override
+  public void setup(CacheManager.CacheContext context)
+  {
+    if (context != null) {
+      numInitCachedLines = context.getValue(CacheManager.CacheContext.NUM_INIT_CACHED_LINES_ATTR);
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/118a75f5/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java b/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java
index 2ea31f1..4724761 100644
--- a/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java
+++ b/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java
@@ -21,6 +21,7 @@ package com.datatorrent.lib.db.cache;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Calendar;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -31,12 +32,16 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.validation.constraints.NotNull;
 
+import org.codehaus.jackson.annotate.JsonIgnore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
 import com.datatorrent.lib.db.KeyValueStore;
 
 /**
@@ -56,12 +61,16 @@ import com.datatorrent.lib.db.KeyValueStore;
 public class CacheManager implements Closeable
 {
   @NotNull
-  protected Primary primary;
+  protected transient Primary primary;
   @NotNull
-  protected Backup backup;
+  protected transient Backup backup;
   protected String refreshTime;
   private transient Timer refresher;
 
+  private transient Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+  private boolean readOnlyData = false;
+  private int numInitCachedLines = -1;
+
   public CacheManager()
   {
     this.primary = new CacheStore();
@@ -69,6 +78,33 @@ public class CacheManager implements Closeable
 
   public void initialize() throws IOException
   {
+    Class[] parameters = new Class[1];
+    parameters[0] = CacheContext.class;
+
+    attributeMap.put(CacheContext.READ_ONLY_ATTR, readOnlyData);
+    attributeMap.put(CacheContext.NUM_INIT_CACHED_LINES_ATTR, numInitCachedLines);
+    CacheContext cacheContext = new CacheContext(attributeMap);
+
+    if (primary instanceof Component) {
+      try {
+        //check Component for method of type setup(CacheManager.CacheContext context)
+        primary.getClass().getMethod("setup", parameters);
+        ((Component)primary).setup(cacheContext);
+      } catch (NoSuchMethodException e) {
+        //Store implements Component, but not Component<CacheManager.CacheContext>
+      }
+    }
+
+    if (backup instanceof Component) {
+      try {
+        //check Component for method of type setup(CacheManager.CacheContext context)
+        backup.getClass().getMethod("setup", parameters);
+        ((Component)backup).setup(cacheContext);
+      } catch (NoSuchMethodException e) {
+        //Store implements Component, but not Component<CacheManager.CacheContext>
+      }
+    }
+
     primary.connect();
     backup.connect();
     Map<Object, Object> initialEntries = backup.loadInitialData();
@@ -76,7 +112,7 @@ public class CacheManager implements Closeable
       primary.putAll(initialEntries);
     }
 
-    if (!Strings.isNullOrEmpty(refreshTime)) {
+    if (!readOnlyData && !Strings.isNullOrEmpty(refreshTime)) {
 
       String[] parts = refreshTime.split("[:\\s]");
 
@@ -151,6 +187,7 @@ public class CacheManager implements Closeable
     this.primary = primary;
   }
 
+  @JsonIgnore
   public Primary getPrimary()
   {
     return primary;
@@ -161,6 +198,7 @@ public class CacheManager implements Closeable
     this.backup = backup;
   }
 
+  @JsonIgnore
   public Backup getBackup()
   {
     return backup;
@@ -182,6 +220,32 @@ public class CacheManager implements Closeable
     return refreshTime;
   }
 
+  public int getNumInitCachedLines()
+  {
+    return numInitCachedLines;
+  }
+
+  /**
+   * The cache store can be refreshed every day at a specific time. This sets
+   * the time. If the time is not set, cache is not refreshed.
+   *
+   * @param numInitCachedLines number of lines to initially from
+   */
+  public void setNumInitCachedLines(int numInitCachedLines)
+  {
+    this.numInitCachedLines = numInitCachedLines;
+  }
+
+  public boolean getReadOnlyData()
+  {
+    return readOnlyData;
+  }
+
+  public void setReadOnlyData(boolean isReadOnly)
+  {
+    readOnlyData = isReadOnly;
+  }
+
   /**
    * A primary store should also provide setting the value for a key.
    */
@@ -193,6 +257,7 @@ public class CacheManager implements Closeable
      *
      * @return all present keys.
      */
+    @JsonIgnore
     Set<Object> getKeys();
   }
 
@@ -210,6 +275,54 @@ public class CacheManager implements Closeable
     Map<Object, Object> loadInitialData();
   }
 
+
+  /**
+   * Used by {@link CacheManager} to pass properties to its stores which implement {@link Component}.
+   */
+  public static class CacheContext implements Context
+  {
+    public static final transient Attribute<Boolean> READ_ONLY_ATTR = new Attribute<>((false));
+    public static final transient Attribute<Integer> NUM_INIT_CACHED_LINES_ATTR = new Attribute<>((-1));
+
+
+    static {
+      Attribute.AttributeMap.AttributeInitializer.initialize(CacheContext.class);
+    }
+
+    Attribute.AttributeMap attributeMap;
+
+    public CacheContext(Attribute.AttributeMap attributeMap)
+    {
+      this.attributeMap = (Attribute.AttributeMap)(attributeMap == null ?
+          new Attribute.AttributeMap.DefaultAttributeMap() : attributeMap);
+    }
+
+    @Override
+    public Attribute.AttributeMap getAttributes()
+    {
+      return attributeMap;
+    }
+
+    @Override
+    public <T> T getValue(Attribute<T> key)
+    {
+      T attr = attributeMap.get(key);
+      return attr != null ? attr : key.defaultValue;
+    }
+
+    @Override
+    public void setCounters(Object counters)
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void sendMetrics(Collection<String> metricNames)
+    {
+      throw new UnsupportedOperationException();
+    }
+  }
+
   @SuppressWarnings("unused")
   private static final Logger LOG = LoggerFactory.getLogger(CacheManager.class);
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/118a75f5/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java b/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java
index d356ee8..1b43085 100644
--- a/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java
+++ b/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java
@@ -29,10 +29,15 @@ import java.util.concurrent.TimeUnit;
 import javax.validation.constraints.Min;
 import javax.validation.constraints.NotNull;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Lists;
 
+import com.datatorrent.api.Component;
+
 /**
  * A {@link CacheManager.Primary} which keeps key/value pairs in memory.<br/>
  *
@@ -46,7 +51,7 @@ import com.google.common.collect.Lists;
  *
  * @since 0.9.2
  */
-public class CacheStore implements CacheManager.Primary
+public class CacheStore implements CacheManager.Primary, Component<CacheManager.CacheContext>
 {
 
   @Min(0)
@@ -65,6 +70,9 @@ public class CacheStore implements CacheManager.Primary
   private transient Cache<Object, Object> cache;
   private transient boolean open;
 
+  private transient int numInitCacheLines = -1;
+
+  private static final Logger logger = LoggerFactory.getLogger(CacheStore.class);
 
   @Override
   public void put(Object key, Object value)
@@ -111,14 +119,26 @@ public class CacheStore implements CacheManager.Primary
   {
     open = true;
 
+    if (numInitCacheLines > maxCacheSize) {
+      logger.warn("numInitCacheLines = {} is greater than maxCacheSize = {}, maxCacheSize was set to {}", numInitCacheLines,
+          maxCacheSize, numInitCacheLines);
+      maxCacheSize = numInitCacheLines;
+    }
+
     CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder();
     cacheBuilder.maximumSize(maxCacheSize);
+
     if (entryExpiryStrategy == ExpiryType.EXPIRE_AFTER_ACCESS) {
       cacheBuilder.expireAfterAccess(entryExpiryDurationInMillis, TimeUnit.MILLISECONDS);
     } else if (entryExpiryStrategy == ExpiryType.EXPIRE_AFTER_WRITE) {
       cacheBuilder.expireAfterWrite(entryExpiryDurationInMillis, TimeUnit.MILLISECONDS);
     }
     cache = cacheBuilder.build();
+
+    if (entryExpiryStrategy == ExpiryType.NO_EVICTION) {
+      return;
+    }
+
     this.cleanupScheduler = Executors.newScheduledThreadPool(1);
     cleanupScheduler.scheduleAtFixedRate(new Runnable()
     {
@@ -140,7 +160,9 @@ public class CacheStore implements CacheManager.Primary
   public void disconnect() throws IOException
   {
     open = false;
-    cleanupScheduler.shutdown();
+    if (cleanupScheduler != null) {
+      cleanupScheduler.shutdown();
+    }
   }
 
   /**
@@ -184,6 +206,29 @@ public class CacheStore implements CacheManager.Primary
   }
 
   /**
+   * Callback to give the component a chance to perform tasks required as part of setting itself up.
+   * This callback is made exactly once during the operator lifetime.
+   *
+   * @param context - CacheContext with AttributeMap passed by CacheManager
+   */
+  @Override
+  public void setup(CacheManager.CacheContext context)
+  {
+    if (context != null) {
+      numInitCacheLines = context.getValue(CacheManager.CacheContext.NUM_INIT_CACHED_LINES_ATTR);
+      if (context.getValue(CacheManager.CacheContext.READ_ONLY_ATTR)) {
+        entryExpiryStrategy = ExpiryType.NO_EVICTION;
+      }
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+
+  }
+
+  /**
    * Strategies for time-based expiration of entries.
    */
   public enum ExpiryType
@@ -197,6 +242,11 @@ public class CacheStore implements CacheManager.Primary
      * Expire the entries after the specified duration has passed since the entry was created, or the most recent
      * replacement of the value.
      */
-    EXPIRE_AFTER_WRITE
+    EXPIRE_AFTER_WRITE,
+
+    /**
+     * Entries never expire. No eviction will be set in cache builder and no cache cleaning will be scheduled.
+     */
+    NO_EVICTION
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/118a75f5/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java b/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java
index caa02bf..0bbc021 100644
--- a/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java
@@ -19,6 +19,7 @@
 package com.datatorrent.lib.db.cache;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -31,6 +32,8 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import com.datatorrent.api.Component;
+
 /**
  * Tests for {@link CacheManager}
  */
@@ -118,6 +121,40 @@ public class CacheManagerTest
     }
   }
 
+  private static class DummyComponentBackupStore extends DummyBackupStore implements Component<CacheManager.CacheContext>
+  {
+    CacheManager.CacheContext cacheContext;
+
+    @Override
+    public Map<Object, Object> loadInitialData()
+    {
+      Map<Object, Object> result = new HashMap();
+      int linesToCache = cacheContext.getValue(CacheManager.CacheContext.NUM_INIT_CACHED_LINES_ATTR);
+      int lineCount = 0;
+
+      for (Map.Entry entry : DummyBackupStore.backupMap.entrySet()) {
+        if (linesToCache > 0 && lineCount >= linesToCache) {
+          break;
+        }
+
+        result.put(entry.getKey(), DummyBackupStore.backupMap.get(entry.getKey()));
+        lineCount++;
+      }
+      return result;
+    }
+
+    @Override
+    public void setup(CacheManager.CacheContext context)
+    {
+      this.cacheContext = context;
+    }
+
+    @Override
+    public void teardown()
+    {
+    }
+  }
+
   @Test
   public void testCacheManager() throws IOException
   {
@@ -131,4 +168,23 @@ public class CacheManagerTest
     Assert.assertEquals("backup hit", "six", manager.get(6));
     Assert.assertEquals("primary updated- total", 6, manager.primary.getKeys().size());
   }
+
+  @Test
+  public void testCacheContextAttrInCacheManager() throws IOException, InterruptedException
+  {
+    CacheStore primaryCache = new CacheStore();
+    primaryCache.setEntryExpiryStrategy(CacheStore.ExpiryType.EXPIRE_AFTER_WRITE);
+    primaryCache.setEntryExpiryDurationInMillis(10);
+    primaryCache.setCacheCleanupInMillis(20);
+    CacheManager manager = new CacheManager();
+    manager.setBackup(new DummyComponentBackupStore());
+    manager.setNumInitCachedLines(3);
+    manager.setReadOnlyData(true);
+    manager.setPrimary(primaryCache);
+    manager.initialize();
+
+    Assert.assertEquals("initial number of cached lines", 3, manager.primary.getKeys().size());
+    Thread.sleep(30);
+    Assert.assertEquals("not evicted number of cached lines", 3, manager.primary.getKeys().size());
+  }
 }