You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2017/05/18 20:41:01 UTC
apex-malhar git commit: APEXMALHAR-2473 Support for global cache meta
information in db CacheManager
Repository: apex-malhar
Updated Branches:
refs/heads/master c3f86f237 -> 118a75f5e
APEXMALHAR-2473 Support for global cache meta information in db CacheManager
1. Uses Component interface and newly implemented CacheContext to pass properties to the Stores by calling setup(CacheContext).
2. APEXMALHAR-2474: FSLoader implements Component to get numInitLinesToCache from CacheManager and use it in initial load.
Add implementation of get() function so data will also be loaded after initial load.
3. APEXMALHAR-2475: CacheStore implements Component for passing readOnly and numInitLinesToCache.
Added NO_EVICTION expire strategy. This strategy will be set in setup(CacheContext) if readOnly is true.
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/118a75f5
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/118a75f5
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/118a75f5
Branch: refs/heads/master
Commit: 118a75f5e78fb7d125f84f996f0bb3c157923d25
Parents: c3f86f2
Author: Oliver Winke <ol...@datatorrent.com>
Authored: Mon Apr 17 11:27:01 2017 -0700
Committer: Oliver Winke <ol...@datatorrent.com>
Committed: Thu May 18 11:00:55 2017 -0700
----------------------------------------------------------------------
.../contrib/enrich/AbstractEnricher.java | 21 +++-
.../datatorrent/contrib/enrich/FSLoader.java | 92 ++++++++++----
.../datatorrent/lib/db/cache/CacheManager.java | 119 ++++++++++++++++++-
.../datatorrent/lib/db/cache/CacheStore.java | 56 ++++++++-
.../lib/db/cache/CacheManagerTest.java | 56 +++++++++
5 files changed, 312 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/118a75f5/contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java
index f2f8421..de9e2c7 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java
@@ -73,10 +73,11 @@ public abstract class AbstractEnricher<INPUT, OUTPUT> extends BaseOperator imple
/**
* Helper variables.
*/
- private transient CacheManager cacheManager;
protected transient List<FieldInfo> lookupFieldInfo = new ArrayList<>();
protected transient List<FieldInfo> includeFieldInfo = new ArrayList<>();
+ private CacheManager cacheManager = new NullValuesCacheManager();
+
/**
* This method needs to be called by implementing class for processing a tuple for enrichment.
* The method will take the tuple through following stages:
@@ -154,7 +155,6 @@ public abstract class AbstractEnricher<INPUT, OUTPUT> extends BaseOperator imple
{
super.setup(context);
- cacheManager = new NullValuesCacheManager();
CacheStore primaryCache = new CacheStore();
// set expiration to one day.
@@ -185,13 +185,18 @@ public abstract class AbstractEnricher<INPUT, OUTPUT> extends BaseOperator imple
try {
cacheManager.initialize();
} catch (IOException e) {
- throw new RuntimeException("Unable to initialize primary cache", e);
+ throw new RuntimeException("Unable to initialize cache manager", e);
}
}
@Override
public void deactivate()
{
+ try {
+ cacheManager.close();
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to close cache manager", e);
+ }
}
/**
@@ -320,4 +325,14 @@ public abstract class AbstractEnricher<INPUT, OUTPUT> extends BaseOperator imple
{
this.cacheSize = cacheSize;
}
+
+ public CacheManager getCacheManager()
+ {
+ return cacheManager;
+ }
+
+ public void setCacheManager(CacheManager cacheManager)
+ {
+ this.cacheManager = cacheManager;
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/118a75f5/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java
index 464fa99..e04d6c4 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java
@@ -28,7 +28,6 @@ import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -37,6 +36,8 @@ import org.apache.hadoop.fs.Path;
import com.esotericsoftware.kryo.NotNull;
import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Component;
import com.datatorrent.lib.db.cache.CacheManager;
import com.datatorrent.lib.util.FieldInfo;
@@ -51,7 +52,7 @@ import com.datatorrent.lib.util.FieldInfo;
* @since 3.4.0
*/
@InterfaceStability.Evolving
-public abstract class FSLoader extends ReadOnlyBackup
+public abstract class FSLoader extends ReadOnlyBackup implements Component<CacheManager.CacheContext>
{
@NotNull
private String fileName;
@@ -59,6 +60,7 @@ public abstract class FSLoader extends ReadOnlyBackup
private transient Path filePath;
private transient FileSystem fs;
private transient boolean connected;
+ private transient int numInitCachedLines = -1;
private static final Logger logger = LoggerFactory.getLogger(FSLoader.class);
@@ -75,40 +77,29 @@ public abstract class FSLoader extends ReadOnlyBackup
@Override
public Map<Object, Object> loadInitialData()
{
- Map<Object, Object> result = null;
- FSDataInputStream in = null;
- BufferedReader bin = null;
- try {
+ Map<Object, Object> result;
+ try (
+ FSDataInputStream in = fs.open(filePath);
+ BufferedReader bin = new BufferedReader(new InputStreamReader(in));
+ ) {
result = Maps.newHashMap();
- in = fs.open(filePath);
- bin = new BufferedReader(new InputStreamReader(in));
String line;
- while ((line = bin.readLine()) != null) {
+ int linesCount = 0;
+ while ((line = bin.readLine()) != null && (numInitCachedLines < 0 || linesCount < numInitCachedLines)) {
try {
Map<String, Object> tuple = extractFields(line);
if (tuple != null && !tuple.isEmpty()) {
result.put(getKey(tuple), getValue(tuple));
}
} catch (Exception parseExp) {
- logger.info("Unable to parse line {}", line);
+ throw new RuntimeException(parseExp);
}
+ ++linesCount;
}
} catch (IOException e) {
throw new RuntimeException(e);
- } finally {
- if (bin != null) {
- IOUtils.closeQuietly(bin);
- }
- if (in != null) {
- IOUtils.closeQuietly(in);
- }
- try {
- fs.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
}
- logger.debug("loading initial data {}", result.size());
+ logger.debug("loading initial data, size: {}", result.size());
return result;
}
@@ -145,9 +136,44 @@ public abstract class FSLoader extends ReadOnlyBackup
@Override
public Object get(Object key)
{
+ try (
+ FSDataInputStream in = openWithRetryFsInputStream(filePath);
+ BufferedReader bin = new BufferedReader(new InputStreamReader(in));
+ ) {
+ String line;
+ while ((line = bin.readLine()) != null) {
+ try {
+ Map<String, Object> tuple = extractFields(line);
+ if (tuple != null && !tuple.isEmpty() && getKey(tuple).equals(key)) {
+ logger.debug("Found line in FS {}", getValue(tuple));
+ return getValue(tuple);
+ }
+ } catch (Exception parseExp) {
+ throw new RuntimeException(parseExp);
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
return null;
}
+ private FSDataInputStream openWithRetryFsInputStream(Path filePath)
+ {
+ FSDataInputStream in;
+ try {
+ in = fs.open(filePath);
+ } catch (IOException e) {
+ try {
+ this.connect();
+ in = fs.open(filePath);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ return in;
+ }
+
@Override
public List<Object> getAll(List<Object> keys)
{
@@ -179,4 +205,24 @@ public abstract class FSLoader extends ReadOnlyBackup
{
return connected;
}
+
+ /**
+ * Callback to give the component a chance to perform tasks required as part of setting itself up.
+ * This callback is made exactly once during the operator lifetime.
+ *
+ * @param context - CacheContext with AttributeMap passed by CacheManager
+ */
+ @Override
+ public void setup(CacheManager.CacheContext context)
+ {
+ if (context != null) {
+ numInitCachedLines = context.getValue(CacheManager.CacheContext.NUM_INIT_CACHED_LINES_ATTR);
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/118a75f5/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java b/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java
index 2ea31f1..4724761 100644
--- a/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java
+++ b/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java
@@ -21,6 +21,7 @@ package com.datatorrent.lib.db.cache;
import java.io.Closeable;
import java.io.IOException;
import java.util.Calendar;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -31,12 +32,16 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
+import org.codehaus.jackson.annotate.JsonIgnore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
import com.datatorrent.lib.db.KeyValueStore;
/**
@@ -56,12 +61,16 @@ import com.datatorrent.lib.db.KeyValueStore;
public class CacheManager implements Closeable
{
@NotNull
- protected Primary primary;
+ protected transient Primary primary;
@NotNull
- protected Backup backup;
+ protected transient Backup backup;
protected String refreshTime;
private transient Timer refresher;
+ private transient Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+ private boolean readOnlyData = false;
+ private int numInitCachedLines = -1;
+
public CacheManager()
{
this.primary = new CacheStore();
@@ -69,6 +78,33 @@ public class CacheManager implements Closeable
public void initialize() throws IOException
{
+ Class[] parameters = new Class[1];
+ parameters[0] = CacheContext.class;
+
+ attributeMap.put(CacheContext.READ_ONLY_ATTR, readOnlyData);
+ attributeMap.put(CacheContext.NUM_INIT_CACHED_LINES_ATTR, numInitCachedLines);
+ CacheContext cacheContext = new CacheContext(attributeMap);
+
+ if (primary instanceof Component) {
+ try {
+ //check Component for method of type setup(CacheManager.CacheContext context)
+ primary.getClass().getMethod("setup", parameters);
+ ((Component)primary).setup(cacheContext);
+ } catch (NoSuchMethodException e) {
+ //Store implements Component, but not Component<CacheManager.CacheContext>
+ }
+ }
+
+ if (backup instanceof Component) {
+ try {
+ //check Component for method of type setup(CacheManager.CacheContext context)
+ backup.getClass().getMethod("setup", parameters);
+ ((Component)backup).setup(cacheContext);
+ } catch (NoSuchMethodException e) {
+ //Store implements Component, but not Component<CacheManager.CacheContext>
+ }
+ }
+
primary.connect();
backup.connect();
Map<Object, Object> initialEntries = backup.loadInitialData();
@@ -76,7 +112,7 @@ public class CacheManager implements Closeable
primary.putAll(initialEntries);
}
- if (!Strings.isNullOrEmpty(refreshTime)) {
+ if (!readOnlyData && !Strings.isNullOrEmpty(refreshTime)) {
String[] parts = refreshTime.split("[:\\s]");
@@ -151,6 +187,7 @@ public class CacheManager implements Closeable
this.primary = primary;
}
+ @JsonIgnore
public Primary getPrimary()
{
return primary;
@@ -161,6 +198,7 @@ public class CacheManager implements Closeable
this.backup = backup;
}
+ @JsonIgnore
public Backup getBackup()
{
return backup;
@@ -182,6 +220,32 @@ public class CacheManager implements Closeable
return refreshTime;
}
+ public int getNumInitCachedLines()
+ {
+ return numInitCachedLines;
+ }
+
+ /**
+ * The cache store can be refreshed every day at a specific time. This sets
+ * the time. If the time is not set, cache is not refreshed.
+ *
+ * @param numInitCachedLines number of lines to initially from
+ */
+ public void setNumInitCachedLines(int numInitCachedLines)
+ {
+ this.numInitCachedLines = numInitCachedLines;
+ }
+
+ public boolean getReadOnlyData()
+ {
+ return readOnlyData;
+ }
+
+ public void setReadOnlyData(boolean isReadOnly)
+ {
+ readOnlyData = isReadOnly;
+ }
+
/**
* A primary store should also provide setting the value for a key.
*/
@@ -193,6 +257,7 @@ public class CacheManager implements Closeable
*
* @return all present keys.
*/
+ @JsonIgnore
Set<Object> getKeys();
}
@@ -210,6 +275,54 @@ public class CacheManager implements Closeable
Map<Object, Object> loadInitialData();
}
+
+ /**
+ * Used by {@link CacheManager} to pass properties to its stores which implement {@link Component}.
+ */
+ public static class CacheContext implements Context
+ {
+ public static final transient Attribute<Boolean> READ_ONLY_ATTR = new Attribute<>((false));
+ public static final transient Attribute<Integer> NUM_INIT_CACHED_LINES_ATTR = new Attribute<>((-1));
+
+
+ static {
+ Attribute.AttributeMap.AttributeInitializer.initialize(CacheContext.class);
+ }
+
+ Attribute.AttributeMap attributeMap;
+
+ public CacheContext(Attribute.AttributeMap attributeMap)
+ {
+ this.attributeMap = (Attribute.AttributeMap)(attributeMap == null ?
+ new Attribute.AttributeMap.DefaultAttributeMap() : attributeMap);
+ }
+
+ @Override
+ public Attribute.AttributeMap getAttributes()
+ {
+ return attributeMap;
+ }
+
+ @Override
+ public <T> T getValue(Attribute<T> key)
+ {
+ T attr = attributeMap.get(key);
+ return attr != null ? attr : key.defaultValue;
+ }
+
+ @Override
+ public void setCounters(Object counters)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void sendMetrics(Collection<String> metricNames)
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(CacheManager.class);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/118a75f5/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java b/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java
index d356ee8..1b43085 100644
--- a/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java
+++ b/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java
@@ -29,10 +29,15 @@ import java.util.concurrent.TimeUnit;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
+import com.datatorrent.api.Component;
+
/**
* A {@link CacheManager.Primary} which keeps key/value pairs in memory.<br/>
*
@@ -46,7 +51,7 @@ import com.google.common.collect.Lists;
*
* @since 0.9.2
*/
-public class CacheStore implements CacheManager.Primary
+public class CacheStore implements CacheManager.Primary, Component<CacheManager.CacheContext>
{
@Min(0)
@@ -65,6 +70,9 @@ public class CacheStore implements CacheManager.Primary
private transient Cache<Object, Object> cache;
private transient boolean open;
+ private transient int numInitCacheLines = -1;
+
+ private static final Logger logger = LoggerFactory.getLogger(CacheStore.class);
@Override
public void put(Object key, Object value)
@@ -111,14 +119,26 @@ public class CacheStore implements CacheManager.Primary
{
open = true;
+ if (numInitCacheLines > maxCacheSize) {
+ logger.warn("numInitCacheLines = {} is greater than maxCacheSize = {}, maxCacheSize was set to {}", numInitCacheLines,
+ maxCacheSize, numInitCacheLines);
+ maxCacheSize = numInitCacheLines;
+ }
+
CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder();
cacheBuilder.maximumSize(maxCacheSize);
+
if (entryExpiryStrategy == ExpiryType.EXPIRE_AFTER_ACCESS) {
cacheBuilder.expireAfterAccess(entryExpiryDurationInMillis, TimeUnit.MILLISECONDS);
} else if (entryExpiryStrategy == ExpiryType.EXPIRE_AFTER_WRITE) {
cacheBuilder.expireAfterWrite(entryExpiryDurationInMillis, TimeUnit.MILLISECONDS);
}
cache = cacheBuilder.build();
+
+ if (entryExpiryStrategy == ExpiryType.NO_EVICTION) {
+ return;
+ }
+
this.cleanupScheduler = Executors.newScheduledThreadPool(1);
cleanupScheduler.scheduleAtFixedRate(new Runnable()
{
@@ -140,7 +160,9 @@ public class CacheStore implements CacheManager.Primary
public void disconnect() throws IOException
{
open = false;
- cleanupScheduler.shutdown();
+ if (cleanupScheduler != null) {
+ cleanupScheduler.shutdown();
+ }
}
/**
@@ -184,6 +206,29 @@ public class CacheStore implements CacheManager.Primary
}
/**
+ * Callback to give the component a chance to perform tasks required as part of setting itself up.
+ * This callback is made exactly once during the operator lifetime.
+ *
+ * @param context - CacheContext with AttributeMap passed by CacheManager
+ */
+ @Override
+ public void setup(CacheManager.CacheContext context)
+ {
+ if (context != null) {
+ numInitCacheLines = context.getValue(CacheManager.CacheContext.NUM_INIT_CACHED_LINES_ATTR);
+ if (context.getValue(CacheManager.CacheContext.READ_ONLY_ATTR)) {
+ entryExpiryStrategy = ExpiryType.NO_EVICTION;
+ }
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+
+ }
+
+ /**
* Strategies for time-based expiration of entries.
*/
public enum ExpiryType
@@ -197,6 +242,11 @@ public class CacheStore implements CacheManager.Primary
* Expire the entries after the specified duration has passed since the entry was created, or the most recent
* replacement of the value.
*/
- EXPIRE_AFTER_WRITE
+ EXPIRE_AFTER_WRITE,
+
+ /**
+ * Entries never expire. No eviction will be set in cache builder and no cache cleaning will be scheduled.
+ */
+ NO_EVICTION
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/118a75f5/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java b/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java
index caa02bf..0bbc021 100644
--- a/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java
@@ -19,6 +19,7 @@
package com.datatorrent.lib.db.cache;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -31,6 +32,8 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.datatorrent.api.Component;
+
/**
* Tests for {@link CacheManager}
*/
@@ -118,6 +121,40 @@ public class CacheManagerTest
}
}
+ private static class DummyComponentBackupStore extends DummyBackupStore implements Component<CacheManager.CacheContext>
+ {
+ CacheManager.CacheContext cacheContext;
+
+ @Override
+ public Map<Object, Object> loadInitialData()
+ {
+ Map<Object, Object> result = new HashMap();
+ int linesToCache = cacheContext.getValue(CacheManager.CacheContext.NUM_INIT_CACHED_LINES_ATTR);
+ int lineCount = 0;
+
+ for (Map.Entry entry : DummyBackupStore.backupMap.entrySet()) {
+ if (linesToCache > 0 && lineCount >= linesToCache) {
+ break;
+ }
+
+ result.put(entry.getKey(), DummyBackupStore.backupMap.get(entry.getKey()));
+ lineCount++;
+ }
+ return result;
+ }
+
+ @Override
+ public void setup(CacheManager.CacheContext context)
+ {
+ this.cacheContext = context;
+ }
+
+ @Override
+ public void teardown()
+ {
+ }
+ }
+
@Test
public void testCacheManager() throws IOException
{
@@ -131,4 +168,23 @@ public class CacheManagerTest
Assert.assertEquals("backup hit", "six", manager.get(6));
Assert.assertEquals("primary updated- total", 6, manager.primary.getKeys().size());
}
+
+ @Test
+ public void testCacheContextAttrInCacheManager() throws IOException, InterruptedException
+ {
+ CacheStore primaryCache = new CacheStore();
+ primaryCache.setEntryExpiryStrategy(CacheStore.ExpiryType.EXPIRE_AFTER_WRITE);
+ primaryCache.setEntryExpiryDurationInMillis(10);
+ primaryCache.setCacheCleanupInMillis(20);
+ CacheManager manager = new CacheManager();
+ manager.setBackup(new DummyComponentBackupStore());
+ manager.setNumInitCachedLines(3);
+ manager.setReadOnlyData(true);
+ manager.setPrimary(primaryCache);
+ manager.initialize();
+
+ Assert.assertEquals("initial number of cached lines", 3, manager.primary.getKeys().size());
+ Thread.sleep(30);
+ Assert.assertEquals("not evicted number of cached lines", 3, manager.primary.getKeys().size());
+ }
}