You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by sa...@apache.org on 2016/05/10 09:05:57 UTC

[3/4] incubator-apex-malhar git commit: APEXMALHAR-2023 Adding Enrichment Operator to Malhar

APEXMALHAR-2023 Adding Enrichment Operator to Malhar

Added 2 operators POJOEnricher and MapEnricher which enriches the given POJO or map as configured.
The operators are marked evolving.
Test cases added for both operators.

2 Backend loaders are added for File and JDBC.
Hbase loader removed as there are some things that needs to be discussed. Postponed the implementation for after discussion.

Added a new SupportType for OBJECT to FieldInfo.
Added static method in FieldInfo for conversion to Class -> SupportedType.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/9600eddd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/9600eddd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/9600eddd

Branch: refs/heads/master
Commit: 9600eddd8ee4fa010cbb4799da74e89d184daa40
Parents: 528e7ac
Author: chinmaykolhatkar <ch...@datatorrent.com>
Authored: Tue Mar 29 14:46:54 2016 +0530
Committer: chinmaykolhatkar <ch...@datatorrent.com>
Committed: Fri Apr 29 22:59:48 2016 +0530

----------------------------------------------------------------------
 .../contrib/enrich/AbstractEnricher.java        | 321 +++++++++++++++++++
 .../contrib/enrich/BackendLoader.java           |  42 +++
 .../datatorrent/contrib/enrich/FSLoader.java    | 184 +++++++++++
 .../datatorrent/contrib/enrich/JDBCLoader.java  | 201 ++++++++++++
 .../datatorrent/contrib/enrich/MapEnricher.java | 138 ++++++++
 .../contrib/enrich/NullValuesCacheManager.java  |  60 ++++
 .../contrib/enrich/POJOEnricher.java            | 287 +++++++++++++++++
 .../contrib/enrich/ReadOnlyBackup.java          |  61 ++++
 .../enrichment/AbstractEnrichmentOperator.java  | 170 ----------
 .../contrib/enrichment/EnrichmentBackup.java    |  18 --
 .../contrib/enrichment/FSLoader.java            | 146 ---------
 .../contrib/enrichment/HBaseLoader.java         | 128 --------
 .../contrib/enrichment/JDBCLoader.java          | 158 ---------
 .../enrichment/MapEnrichmentOperator.java       |  57 ----
 .../enrichment/NullValuesCacheManager.java      |  43 ---
 .../enrichment/POJOEnrichmentOperator.java      | 185 -----------
 .../contrib/enrichment/ReadOnlyBackup.java      |  38 ---
 .../contrib/enrichment/package-info.java        |   1 -
 .../contrib/enrich/EmployeeOrder.java           | 114 +++++++
 .../contrib/enrich/FileEnrichmentTest.java      | 103 ++++++
 .../contrib/enrich/JDBCLoaderTest.java          | 209 ++++++++++++
 .../contrib/enrich/MapEnricherTest.java         | 251 +++++++++++++++
 .../com/datatorrent/contrib/enrich/Order.java   |  71 ++++
 .../contrib/enrich/POJOEnricherTest.java        | 232 ++++++++++++++
 .../enrichment/BeanEnrichmentOperatorTest.java  |  95 ------
 .../contrib/enrichment/EmployeeOrder.java       |  95 ------
 .../contrib/enrichment/FileEnrichmentTest.java  |  75 -----
 .../contrib/enrichment/HBaseLoaderTest.java     | 162 ----------
 .../contrib/enrichment/JDBCLoaderTest.java      | 179 -----------
 .../enrichment/MapEnrichmentOperatorTest.java   | 152 ---------
 contrib/src/test/resources/productmapping.txt   | 100 ++++++
 .../com/datatorrent/lib/util/FieldInfo.java     |  15 +-
 32 files changed, 2388 insertions(+), 1703 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java
new file mode 100644
index 0000000..cdefddf
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.NotNull;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.db.cache.CacheManager;
+import com.datatorrent.lib.db.cache.CacheStore;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+
+/**
+ * Base class for Enrichment Operator.&nbsp; Subclasses should provide implementation to getKey and convert.
+ * The operator receives a tuple and emits enriched tuple based on includeFields and lookupFields. <br/>
+ * <p>
+ * Properties:<br>
+ * <b>lookupFields</b>: List of comma separated keys for quick searching. Ex: Field1,Field2,Field3<br>
+ * <b>includeFields</b>: List of comma separated fields to be replaced/added to the input tuple. Ex: Field1,Field2,Field3<br>
+ * <b>store</b>: Specify the type of loader for looking data<br>
+ * <br>
+ *
+ * @param <INPUT>  Type of tuples which are received by this operator</T>
+ * @param <OUTPUT> Type of tuples which are emitted by this operator</T>
+ * @displayName Abstract Enrichment Operator
+ * @tags Enrichment
+ */
+@InterfaceStability.Evolving
+public abstract class AbstractEnricher<INPUT, OUTPUT> extends BaseOperator implements Operator.ActivationListener
+{
+  /**
+   * Mandatory parameters for Enricher
+   */
+  @NotNull
+  protected List<String> lookupFields;
+  @NotNull
+  protected List<String> includeFields;
+  @NotNull
+  private BackendLoader store;
+
+  /**
+   * Optional parameters for enricher.
+   */
+  private int cacheExpirationInterval = 1 * 60 * 60 * 1000;  // 1 hour
+  private int cacheCleanupInterval = 1 * 60 * 60 * 1000; // 1 hour
+  private int cacheSize = 1024; // 1024 records
+
+  /**
+   * Helper variables.
+   */
+  private transient CacheManager cacheManager;
+  protected transient List<FieldInfo> lookupFieldInfo = new ArrayList<>();
+  protected transient List<FieldInfo> includeFieldInfo = new ArrayList<>();
+
+  /**
+   * This method needs to be called by implementing class for processing a tuple for enrichment.
+   * The method will take the tuple through following stages:
+   * <ol>
+   * <li>Call {@link #getKey(Object)} to retrieve key fields for lookup</li>
+   * <li>Using key fields call cache manager to retrieve for any key that is cached already</li>
+   * <li>If not found in cache, it'll do a lookup in configured backend store</li>
+   * <li>The retrieved fields will be passed to {@link #convert(Object, Object)} method to create the final object</li>
+   * <li>Finally {@link #emitEnrichedTuple(Object)} is called for emitting the tuple</li>
+   * </ol>
+   *
+   * @param tuple Input tuple that needs to get processed for enrichment.
+   */
+  protected void enrichTuple(INPUT tuple)
+  {
+    Object key = getKey(tuple);
+    if (key != null) {
+      Object result = cacheManager.get(key);
+      OUTPUT out = convert(tuple, result);
+      if (out != null) {
+        emitEnrichedTuple(out);
+      }
+    }
+  }
+
+  /**
+   * The method should be implemented by concrete class which returns an ArrayList<Object> containing all the fields
+   * which forms key part of lookup.
+   * The order of field values should be same as the one set in {@link #lookupFields} variable.
+   *
+   * @param tuple Input tuple from which fields values for key needs to be fetched.
+   * @return Should return ArrayList<Object> which has fields values forming keys in same order as {@link #lookupFields}
+   */
+  protected abstract Object getKey(INPUT tuple);
+
+  /**
+   * The method should be implemented by concrete class.
+   * This method is expected to take input tuple and an externally fetched object containing fields to be enriched, and
+   * return an enriched tuple which is ready to be emitted.
+   *
+   * @param in     Input tuple which needs to be enriched.
+   * @param cached ArrayList<Object> containing missing data retrieved from external sources.
+   * @return Enriched tuple of type OUTPUT
+   */
+  protected abstract OUTPUT convert(INPUT in, Object cached);
+
+  /**
+   * This method should be implemented by concrete class.
+   * The method is expected to emit tuple of type OUTPUT
+   *
+   * @param tuple Tuple of type OUTPUT that should be emitted.
+   */
+  protected abstract void emitEnrichedTuple(OUTPUT tuple);
+
+  /**
+   * This method should be implemented by concrete method.
+   * The method should return Class type of field for given fieldName from output tuple.
+   *
+   * @param fieldName Field name for which field type needs to be identified
+   * @return Class type for given field.
+   */
+  protected abstract Class<?> getIncludeFieldType(String fieldName);
+
+  /**
+   * This method should be implemented by concrete method.
+   * The method should return Class type of field for given fieldName from input tuple.
+   *
+   * @param fieldName Field name for which field type needs to be identified
+   * @return Class type for given field.
+   */
+  protected abstract Class<?> getLookupFieldType(String fieldName);
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+
+    cacheManager = new NullValuesCacheManager();
+    CacheStore primaryCache = new CacheStore();
+
+    // set expiration to one day.
+    primaryCache.setEntryExpiryDurationInMillis(cacheExpirationInterval);
+    primaryCache.setCacheCleanupInMillis(cacheCleanupInterval);
+    primaryCache.setEntryExpiryStrategy(CacheStore.ExpiryType.EXPIRE_AFTER_WRITE);
+    primaryCache.setMaxCacheSize(cacheSize);
+
+    cacheManager.setPrimary(primaryCache);
+    cacheManager.setBackup(store);
+  }
+
+  @Override
+  public void activate(Context context)
+  {
+    for (String s : lookupFields) {
+      lookupFieldInfo.add(new FieldInfo(s, s, SupportType.getFromJavaType(getLookupFieldType(s))));
+    }
+
+    if (includeFields != null) {
+      for (String s : includeFields) {
+        includeFieldInfo.add(new FieldInfo(s, s, SupportType.getFromJavaType(getIncludeFieldType(s))));
+      }
+    }
+
+    store.setFieldInfo(lookupFieldInfo, includeFieldInfo);
+
+    try {
+      cacheManager.initialize();
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to initialize primary cache", e);
+    }
+  }
+
+  @Override
+  public void deactivate()
+  {
+  }
+
+  /**
+   * Returns a list of fields which are used for lookup.
+   *
+   * @return List of fields
+   */
+  public List<String> getLookupFields()
+  {
+    return lookupFields;
+  }
+
+  /**
+   * Set fields on which lookup needs to happen in external store.
+   * This is a mandatory parameter.
+   *
+   * @param lookupFields List of fields on which lookup happens.
+   */
+  public void setLookupFields(List<String> lookupFields)
+  {
+    this.lookupFields = lookupFields;
+  }
+
+  /**
+   * Returns a list of fields using which tuple is enriched
+   *
+   * @return List of fields.
+   */
+  public List<String> getIncludeFields()
+  {
+    return includeFields;
+  }
+
+  /**
+   * Sets list of fields to be fetched from external store for enriching the tuple.
+   * This is a mandatory parameter.
+   *
+   * @param includeFields List of fields.
+   */
+  public void setIncludeFields(List<String> includeFields)
+  {
+    this.includeFields = includeFields;
+  }
+
+  /**
+   * Returns the backend store which will enrich the tuple.
+   *
+   * @return Object of type {@link BackendLoader}
+   */
+  public BackendLoader getStore()
+  {
+    return store;
+  }
+
+  /**
+   * Sets backend store which will enrich the tuple.
+   * This is a mandatory parameter.
+   *
+   * @param store Object of type {@link BackendLoader}
+   */
+  public void setStore(BackendLoader store)
+  {
+    this.store = store;
+  }
+
+  /**
+   * Returns cache entry expiration interval in ms.
+   * This is an optional parameter.
+   *
+   * @return Cache entry expiration interval in ms
+   */
+  public int getCacheExpirationInterval()
+  {
+    return cacheExpirationInterval;
+  }
+
+  /**
+   * Sets cache entry expiration interval in ms.
+   * This is an optional parameter.
+   *
+   * @param cacheExpirationInterval Cache entry expiration interval in ms
+   */
+  public void setCacheExpirationInterval(int cacheExpirationInterval)
+  {
+    this.cacheExpirationInterval = cacheExpirationInterval;
+  }
+
+  /**
+   * Returns cache cleanup interval in ms. After this interval, cache cleanup operation will be performed.
+   * This is an optional parameter.
+   *
+   * @return cache cleanup interval in ms.
+   */
+  public int getCacheCleanupInterval()
+  {
+    return cacheCleanupInterval;
+  }
+
+  /**
+   * Set Cache cleanup interval in ms. After this interval, cache cleanup operation will be performed.
+   * This is an optional parameter.
+   *
+   * @param cacheCleanupInterval cache cleanup interval in ms.
+   */
+  public void setCacheCleanupInterval(int cacheCleanupInterval)
+  {
+    this.cacheCleanupInterval = cacheCleanupInterval;
+  }
+
+  /**
+   * Get size (number of entries) of cache.
+   *
+   * @return Number of entries allowed in cache.
+   */
+  public int getCacheSize()
+  {
+    return cacheSize;
+  }
+
+  /**
+   * Set size (number of entries) of cache.
+   *
+   * @param cacheSize Number of entries allowed in cache.
+   */
+  public void setCacheSize(int cacheSize)
+  {
+    this.cacheSize = cacheSize;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrich/BackendLoader.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/BackendLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/BackendLoader.java
new file mode 100644
index 0000000..9570329
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/BackendLoader.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.lib.db.cache.CacheManager;
+import com.datatorrent.lib.util.FieldInfo;
+
+/**
+ * Interface for store to be used in enrichment
+ */
+@InterfaceStability.Evolving
+public interface BackendLoader extends CacheManager.Backup
+{
+  /**
+   * Set {@link FieldInfo} for lookup fields and also include fields.
+   * Calling this method is mandatory for correct functioning of backend loader.
+   *
+   * @param lookupFieldInfo  List of {@link FieldInfo} that will be used as key in lookup.
+   * @param includeFieldInfo List of {@link FieldInfo} that will be retrieved from store.
+   */
+  void setFieldInfo(List<FieldInfo> lookupFieldInfo, List<FieldInfo> includeFieldInfo);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java
new file mode 100644
index 0000000..71d3dce
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.esotericsoftware.kryo.NotNull;
+import com.google.common.collect.Maps;
+import com.datatorrent.lib.db.cache.CacheManager;
+import com.datatorrent.lib.util.FieldInfo;
+
+
+/**
+ * This implementation of {@link BackendLoader} loads the data from a given file into memory cache and serves queries
+ * from the cache.
+ * When this is set as primaryCache in {@link CacheManager}, CacheManager can call {@link #loadInitialData()}
+ * periodically to reload the file.
+ * <p>
+ * The format of the input file is:
+ * <p>
+ * {"productCategory": 5, "productId": 0}
+ * {"productCategory": 4, "productId": 1}
+ * {"productCategory": 5, "productId": 2}
+ * {"productCategory": 5, "productId": 3}
+ * </p>
+ * Each line in the input file should be a valid json object which represents a record and each key/value pair in that
+ * json object represents the fields/value.
+ * <p>
+ * NOTE: This loader should be used with caution as all the data present in the file is loaded in memory because of
+ * which the memory consumption may go up.
+ */
+@InterfaceStability.Evolving
+public class FSLoader extends ReadOnlyBackup
+{
+  @NotNull
+  private String fileName;
+
+  private transient Path filePath;
+  private transient FileSystem fs;
+  private transient boolean connected;
+
+  private static final ObjectMapper mapper = new ObjectMapper();
+  private static final ObjectReader reader = mapper.reader(new TypeReference<Map<String, Object>>(){});
+  private static final Logger logger = LoggerFactory.getLogger(FSLoader.class);
+
+  public String getFileName()
+  {
+    return fileName;
+  }
+
+  public void setFileName(String fileName)
+  {
+    this.fileName = fileName;
+  }
+
+  @Override
+  public Map<Object, Object> loadInitialData()
+  {
+    Map<Object, Object> result = null;
+    FSDataInputStream in = null;
+    BufferedReader bin = null;
+    try {
+      result = Maps.newHashMap();
+      in = fs.open(filePath);
+      bin = new BufferedReader(new InputStreamReader(in));
+      String line;
+      while ((line = bin.readLine()) != null) {
+        try {
+          Map<String, Object> tuple = reader.readValue(line);
+          result.put(getKey(tuple), getValue(tuple));
+        } catch (JsonProcessingException parseExp) {
+          logger.info("Unable to parse line {}", line);
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } finally {
+      if (bin != null) {
+        IOUtils.closeQuietly(bin);
+      }
+      if (in != null) {
+        IOUtils.closeQuietly(in);
+      }
+      try {
+        fs.close();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    logger.debug("loading initial data {}", result.size());
+    return result;
+  }
+
+  private Object getValue(Map<String, Object> tuple)
+  {
+    ArrayList<Object> includeTuple = new ArrayList<Object>();
+    for (FieldInfo s : includeFieldInfo) {
+      includeTuple.add(tuple.get(s.getColumnName()));
+    }
+    return includeTuple;
+  }
+
+  private Object getKey(Map<String, Object> tuple)
+  {
+    ArrayList<Object> list = new ArrayList<Object>();
+    for (FieldInfo key : lookupFieldInfo) {
+      list.add(tuple.get(key.getColumnName()));
+    }
+    return list;
+  }
+
+  @Override
+  public Object get(Object key)
+  {
+    return null;
+  }
+
+  @Override
+  public List<Object> getAll(List<Object> keys)
+  {
+    return null;
+  }
+
+  @Override
+  public void connect() throws IOException
+  {
+    Configuration conf = new Configuration();
+    this.filePath = new Path(fileName);
+    this.fs = FileSystem.newInstance(filePath.toUri(), conf);
+    if (!fs.isFile(filePath)) {
+      throw new IOException("Provided path " + fileName + " is not a file");
+    }
+    connected = true;
+  }
+
+  @Override
+  public void disconnect() throws IOException
+  {
+    if (fs != null) {
+      fs.close();
+    }
+  }
+
+  @Override
+  public boolean isConnected()
+  {
+    return connected;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrich/JDBCLoader.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/JDBCLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/JDBCLoader.java
new file mode 100644
index 0000000..d20e87b
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/JDBCLoader.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.Lists;
+import com.datatorrent.lib.db.jdbc.JdbcStore;
+import com.datatorrent.lib.util.FieldInfo;
+
+/**
+ * <p>HBaseLoader extends from {@link JdbcStore} uses JDBC to connect and implements BackendLoaders interface.</p> <br/>
+ * <p>
+ * Properties:<br>
+ * <b>queryStmt</b>: Sql Prepared Statement which needs to be executed<br>
+ * <b>tableName</b>: JDBC table name<br>
+ * <br>
+ */
+@InterfaceStability.Evolving
+public class JDBCLoader extends JdbcStore implements BackendLoader
+{
+  protected String queryStmt;
+
+  protected String tableName;
+
+  protected transient List<FieldInfo> includeFieldInfo;
+  protected transient List<FieldInfo> lookupFieldInfo;
+
+  protected Object getQueryResult(Object key)
+  {
+    try {
+      PreparedStatement getStatement = getConnection().prepareStatement(queryStmt);
+      ArrayList<Object> keys = (ArrayList<Object>)key;
+      for (int i = 0; i < keys.size(); i++) {
+        getStatement.setObject(i + 1, keys.get(i));
+      }
+      return getStatement.executeQuery();
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  protected ArrayList<Object> getDataFrmResult(Object result) throws RuntimeException
+  {
+    try {
+      ResultSet resultSet = (ResultSet)result;
+      if (resultSet.next()) {
+        ResultSetMetaData rsdata = resultSet.getMetaData();
+        // If the includefields is empty, populate it from ResultSetMetaData
+        if (CollectionUtils.isEmpty(includeFieldInfo)) {
+          if (includeFieldInfo == null) {
+            includeFieldInfo = new ArrayList<>();
+          }
+          for (int i = 1; i <= rsdata.getColumnCount(); i++) {
+            String columnName = rsdata.getColumnName(i);
+            // TODO: Take care of type conversion.
+            includeFieldInfo.add(new FieldInfo(columnName, columnName, FieldInfo.SupportType.OBJECT));
+          }
+        }
+
+        ArrayList<Object> res = new ArrayList<Object>();
+        for (FieldInfo f : includeFieldInfo) {
+          res.add(getConvertedData(resultSet.getObject(f.getColumnName()), f));
+        }
+        return res;
+      } else {
+        return null;
+      }
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private Object getConvertedData(Object object, FieldInfo f)
+  {
+    if (f.getType().getJavaType() == object.getClass()) {
+      return object;
+    } else {
+      logger.warn("Type mismatch seen for field {}, returning as it is", f.getColumnName());
+      return null;
+    }
+  }
+
+  private String generateQueryStmt()
+  {
+    String stmt = "select * from " + tableName + " where ";
+    boolean first = true;
+    for (FieldInfo fieldInfo : lookupFieldInfo) {
+      if (first) {
+        first = false;
+      } else {
+        stmt += " and ";
+      }
+      stmt += fieldInfo.getColumnName() + " = ?";
+    }
+
+    logger.info("generateQueryStmt: {}", stmt);
+    return stmt;
+  }
+
+  public String getQueryStmt()
+  {
+    return queryStmt;
+  }
+
+  /**
+   * Set the sql Prepared Statement if the enrichment mechanism is query based.
+   */
+  public void setQueryStmt(String queryStmt)
+  {
+    this.queryStmt = queryStmt;
+  }
+
+  public String getTableName()
+  {
+    return tableName;
+  }
+
+  /**
+   * Set the table name.
+   */
+  public void setTableName(String tableName)
+  {
+    this.tableName = tableName;
+  }
+
+  @Override
+  public void setFieldInfo(List<FieldInfo> lookupFieldInfo, List<FieldInfo> includeFieldInfo)
+  {
+    this.lookupFieldInfo = lookupFieldInfo;
+    this.includeFieldInfo = includeFieldInfo;
+    if (queryStmt == null) {
+      queryStmt = generateQueryStmt();
+    }
+  }
+
+  @Override
+  public Map<Object, Object> loadInitialData()
+  {
+    return null;
+  }
+
+  @Override
+  public Object get(Object key)
+  {
+    return getDataFrmResult(getQueryResult(key));
+  }
+
+  @Override
+  public List<Object> getAll(List<Object> keys)
+  {
+    List<Object> values = Lists.newArrayList();
+    for (Object key : keys) {
+      values.add(get(key));
+    }
+    return values;
+  }
+
+  @Override
+  public void put(Object key, Object value)
+  {
+    throw new UnsupportedOperationException("Not supported operation");
+  }
+
+  @Override
+  public void putAll(Map<Object, Object> m)
+  {
+    throw new UnsupportedOperationException("Not supported operation");
+  }
+
+  @Override
+  public void remove(Object key)
+  {
+    throw new UnsupportedOperationException("Not supported operation");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrich/MapEnricher.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/MapEnricher.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/MapEnricher.java
new file mode 100644
index 0000000..ecf16ba
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/MapEnricher.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.util.FieldInfo;
+
+/**
+ * This class takes a HashMap tuple as input and extracts value of the lookupKey configured
+ * for this operator. It perform a lookup using {@link com.datatorrent.lib.db.cache.CacheManager} to
+ * find a matching entry and adds the result to the original tuple.
+ *
+ * <p>
+ * Example:
+ * Lets say, input tuple is
+ *    { amount=10.0, channelId=4, productId=3 }
+ * The tuple is modified as below:
+ *    { amount=10.0, channelId=4, productId=3, <b>productCategory=1 </b>}
+ * </p>
+ *
+ * @displayName MapEnricher
+ * @category Database
+ * @tags enrichment, lookup, map
+ */
+@InterfaceStability.Evolving
+public class MapEnricher extends AbstractEnricher<Map<String, Object>, Map<String, Object>>
+{
+  public final transient DefaultInputPort<Map<String, Object>> input = new DefaultInputPort<Map<String, Object>>()
+  {
+    @Override
+    public void process(Map<String, Object> obj)
+    {
+      processTuple(obj);
+    }
+  };
+
+  public final transient DefaultOutputPort<Map<String, Object>> output = new DefaultOutputPort<>();
+
+  protected void processTuple(Map<String, Object> obj)
+  {
+    enrichTuple(obj);
+  }
+
+  @Override
+  protected Object getKey(Map<String, Object> tuple)
+  {
+    ArrayList<Object> keyList = new ArrayList<Object>();
+
+    for (FieldInfo fieldInfo : lookupFieldInfo) {
+      keyList.add(tuple.get(fieldInfo.getColumnName()));
+    }
+
+    return keyList;
+  }
+
+  @Override
+  protected Map<String, Object> convert(Map<String, Object> in, Object cached)
+  {
+    if (cached == null) {
+      return in;
+    }
+
+    ArrayList<Object> newAttributes = (ArrayList<Object>)cached;
+    if (newAttributes != null) {
+      for (int i = 0; i < includeFieldInfo.size(); i++) {
+        in.put(includeFieldInfo.get(i).getColumnName(), newAttributes.get(i));
+      }
+    }
+    return in;
+  }
+
+  @Override
+  protected void emitEnrichedTuple(Map<String, Object> tuple)
+  {
+    output.emit(tuple);
+  }
+
+  @Override
+  protected Class<?> getIncludeFieldType(String fieldName)
+  {
+    return Object.class;
+  }
+
+  @Override
+  protected Class<?> getLookupFieldType(String fieldName)
+  {
+    return Object.class;
+  }
+
+  /**
+   * Set fields on which lookup against which lookup will be performed.
+   * This is a mandatory parameter to set.
+   *
+   * @param lookupFields List of fields on which lookup happens.
+   * @description $[] Field which become part of lookup key
+   */
+  @Override
+  public void setLookupFields(List<String> lookupFields)
+  {
+    super.setLookupFields(lookupFields);
+  }
+
+  /**
+   * Set fields which will enrich the map.
+   * This is a mandatory parameter to set.
+   *
+   * @param includeFields List of fields.
+   * @description $[] Field which are fetched from store
+   */
+  @Override
+  public void setIncludeFields(List<String> includeFields)
+  {
+    super.setIncludeFields(includeFields);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrich/NullValuesCacheManager.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/NullValuesCacheManager.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/NullValuesCacheManager.java
new file mode 100644
index 0000000..2cf7326
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/NullValuesCacheManager.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.lib.db.cache.CacheManager;
+
+/**
+ * Null Values Cache Manager. Using this NULL entries can be specified explicitly.
+ */
+@InterfaceStability.Evolving
+public class NullValuesCacheManager extends CacheManager
+{
+
+  private static final NullObject NULL = new NullObject();
+
+  @Override
+  public Object get(Object key)
+  {
+    Object primaryVal = primary.get(key);
+    if (primaryVal != null) {
+      if (primaryVal == NULL) {
+        return null;
+      }
+
+      return primaryVal;
+    }
+
+    Object backupVal = backup.get(key);
+    if (backupVal != null) {
+      primary.put(key, backupVal);
+    } else {
+      primary.put(key, NULL);
+    }
+
+    return backupVal;
+  }
+
+  private static class NullObject
+  {
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrich/POJOEnricher.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/POJOEnricher.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/POJOEnricher.java
new file mode 100644
index 0000000..782fbc5
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/POJOEnricher.java
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.PojoUtils;
+
+
+/**
+ * This class takes a POJO as input and extracts the value of the lookupKey configured
+ * for this operator. It perform a lookup using {@link com.datatorrent.lib.db.cache.CacheManager} to
+ * find a matching entry and adds the result to the original tuple.
+ *
+ * <p>
+ * Properties:<br>
+ * <b>inputClass</b>: Class to be loaded for the incoming data type<br>
+ * <b>outputClass</b>: Class to be loaded for the emitted data type<br>
+ * <br>
+ * <p>
+ *
+ * <p>
+ * Example:
+ * Lets say, input tuple is
+ *    { amount=10.0, channelId=4, productId=3 }
+ * The tuple is modified as below:
+ *    { amount=10.0, channelId=4, productId=3, <b>productCategory=1 </b>}
+ * </p>
+ *
+ * @displayName POJOEnricher
+ * @category Database
+ * @tags enrichment, enricher, pojo, schema, lookup
+ */
+@InterfaceStability.Evolving
+public class POJOEnricher extends AbstractEnricher<Object, Object>
+{
+  private static final Logger logger = LoggerFactory.getLogger(POJOEnricher.class);
+
+  /**
+   * Helper fields
+   */
+  protected Class<?> inputClass;
+  protected Class<?> outputClass;
+  private transient Map<PojoUtils.Getter, PojoUtils.Setter> fieldMap = new HashMap<>();
+  private transient List<PojoUtils.Setter> includeSetters = new ArrayList<>();
+  private transient List<PojoUtils.Getter> lookupGetters = new ArrayList<>();
+
+  /**
+   * AutoMetrics
+   */
+  @AutoMetric
+  private int enrichedTupleCount;
+  @AutoMetric
+  private int errorTupleCount;
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
+  {
+    @Override
+    public void setup(Context.PortContext context)
+    {
+      inputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+    }
+
+    @Override
+    public void process(Object object)
+    {
+      processTuple(object);
+    }
+  };
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>()
+  {
+    @Override
+    public void setup(Context.PortContext context)
+    {
+      outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+    }
+  };
+
+  public final transient DefaultOutputPort<Object> error = new DefaultOutputPort<>();
+
+  protected void processTuple(Object object)
+  {
+    enrichTuple(object);
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    enrichedTupleCount = 0;
+    errorTupleCount = 0;
+  }
+
+  @Override
+  protected Object getKey(Object tuple)
+  {
+    ArrayList<Object> keyList = new ArrayList<>();
+    for (PojoUtils.Getter lookupGetter : lookupGetters) {
+      keyList.add(lookupGetter.get(tuple));
+    }
+    return keyList;
+  }
+
+  @Override
+  protected Object convert(Object in, Object cached)
+  {
+    Object o;
+
+    try {
+      o = outputClass.newInstance();
+    } catch (InstantiationException | IllegalAccessException e) {
+      logger.error("Failed to create new instance of output POJO", e);
+      errorTupleCount++;
+      error.emit(in);
+      return null;
+    }
+
+    try {
+      for (Map.Entry<PojoUtils.Getter, PojoUtils.Setter> entry : fieldMap.entrySet()) {
+        entry.getValue().set(o, entry.getKey().get(in));
+      }
+    } catch (RuntimeException e) {
+      logger.error("Failed to set the property. Continuing with default.", e);
+      errorTupleCount++;
+      error.emit(in);
+      return null;
+    }
+
+    if (cached == null) {
+      return o;
+    }
+
+    ArrayList<Object> includeObjects = (ArrayList<Object>)cached;
+    for (int i = 0; i < includeSetters.size(); i++) {
+      try {
+        includeSetters.get(i).set(o, includeObjects.get(i));
+      } catch (RuntimeException e) {
+        logger.error("Failed to set the property. Continuing with default.", e);
+        errorTupleCount++;
+        error.emit(in);
+        return null;
+      }
+    }
+
+    return o;
+  }
+
+  @Override
+  protected void emitEnrichedTuple(Object tuple)
+  {
+    output.emit(tuple);
+    enrichedTupleCount++;
+  }
+
+  @Override
+  protected Class<?> getIncludeFieldType(String fieldName)
+  {
+    try {
+      return outputClass.getDeclaredField(fieldName).getType();
+    } catch (NoSuchFieldException e) {
+      logger.warn("Failed to find given fieldName, returning object type", e);
+      return Object.class;
+    }
+  }
+
+  @Override
+  protected Class<?> getLookupFieldType(String fieldName)
+  {
+    try {
+      return inputClass.getDeclaredField(fieldName).getType();
+    } catch (NoSuchFieldException e) {
+      logger.warn("Failed to find given fieldName, returning object type", e);
+      return Object.class;
+    }
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  private PojoUtils.Setter generateSettersForField(Class<?> klass, String outputFieldName)
+      throws NoSuchFieldException, SecurityException
+  {
+    Field f = klass.getDeclaredField(outputFieldName);
+    Class c = ClassUtils.primitiveToWrapper(f.getType());
+    return PojoUtils.createSetter(klass, outputFieldName, c);
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  private PojoUtils.Getter generateGettersForField(Class<?> klass, String inputFieldName)
+      throws NoSuchFieldException, SecurityException
+  {
+    Field f = klass.getDeclaredField(inputFieldName);
+    Class c = ClassUtils.primitiveToWrapper(f.getType());
+    return PojoUtils.createGetter(klass, inputFieldName, c);
+  }
+
+  @Override
+  public void activate(Context context)
+  {
+    super.activate(context);
+
+    for (Field field : inputClass.getDeclaredFields()) {
+      try {
+        fieldMap.put(generateGettersForField(inputClass, field.getName()),
+            generateSettersForField(outputClass, field.getName()));
+      } catch (NoSuchFieldException e) {
+        throw new RuntimeException("Unable to find field with name " + field.getName() + ", ignoring that field.", e);
+      }
+    }
+
+    for (FieldInfo fieldInfo : this.includeFieldInfo) {
+      try {
+        includeSetters.add(generateSettersForField(outputClass, fieldInfo.getColumnName()));
+      } catch (NoSuchFieldException e) {
+        throw new RuntimeException("Given field name is not present in output POJO", e);
+      }
+    }
+
+    for (FieldInfo fieldInfo : this.lookupFieldInfo) {
+      try {
+        lookupGetters.add(generateGettersForField(inputClass, fieldInfo.getColumnName()));
+      } catch (NoSuchFieldException e) {
+        throw new RuntimeException("Given lookup field is not present in POJO", e);
+      }
+    }
+  }
+
+  /**
+   * Set fields on which lookup against which lookup will be performed.
+   * This is a mandatory parameter to set.
+   *
+   * @param lookupFields List of fields on which lookup happens.
+   * @description $[] Field which become part of lookup key
+   * @useSchema $[] input.fields[].name
+   */
+  @Override
+  public void setLookupFields(List<String> lookupFields)
+  {
+    super.setLookupFields(lookupFields);
+  }
+
+  /**
+   * Set fields which will enrich the POJO.
+   * This is a mandatory parameter to set.
+   *
+   * @param includeFields List of fields.
+   * @description $[] Field which are fetched from store
+   * @useSchema $[] output.fields[].name
+   */
+  @Override
+  public void setIncludeFields(List<String> includeFields)
+  {
+    super.setIncludeFields(includeFields);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrich/ReadOnlyBackup.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/ReadOnlyBackup.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/ReadOnlyBackup.java
new file mode 100644
index 0000000..157dbc9
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/ReadOnlyBackup.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.lib.util.FieldInfo;
+
+/**
+ * ReadOnly abstract implementation of BackendLoader.
+ */
+@InterfaceStability.Evolving
+public abstract class ReadOnlyBackup implements BackendLoader
+{
+  protected transient List<FieldInfo> includeFieldInfo;
+  protected transient List<FieldInfo> lookupFieldInfo;
+
+  @Override
+  public void put(Object key, Object value)
+  {
+    throw new UnsupportedOperationException("Not supported operation");
+  }
+
+  @Override
+  public void putAll(Map<Object, Object> m)
+  {
+    throw new UnsupportedOperationException("Not supported operation");
+  }
+
+  @Override
+  public void remove(Object key)
+  {
+    throw new UnsupportedOperationException("Not supported operation");
+  }
+
+  @Override
+  public void setFieldInfo(List<FieldInfo> lookupFieldInfo, List<FieldInfo> includeFieldInfo)
+  {
+    this.includeFieldInfo = includeFieldInfo;
+    this.lookupFieldInfo = lookupFieldInfo;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/AbstractEnrichmentOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/AbstractEnrichmentOperator.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/AbstractEnrichmentOperator.java
deleted file mode 100644
index cbd4d5e..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/AbstractEnrichmentOperator.java
+++ /dev/null
@@ -1,170 +0,0 @@
-package com.datatorrent.contrib.enrichment;
-
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-import com.datatorrent.lib.db.cache.CacheManager;
-import com.datatorrent.lib.db.cache.CacheStore;
-import com.datatorrent.lib.db.cache.CacheStore.ExpiryType;
-import com.esotericsoftware.kryo.NotNull;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Base class for Enrichment Operator.&nbsp; Subclasses should provide implementation to getKey and convert.
- * The operator receives a tuple and emits enriched tuple based on includeFields and lookupFields. <br/>
- *
- * Properties:<br>
- * <b>lookupFieldsStr</b>: List of comma separated keys for quick searching. Ex: Field1,Field2,Field3<br>
- * <b>includeFieldsStr</b>: List of comma separated fields to be replaced/added to the input tuple. Ex: Field1,Field2,Field3<br>
- * <b>store</b>: Specify the type of loader for looking data<br>
- * <br>
- *
- *
- * @displayName Abstract Enrichment Operator
- * @tags Enrichment
- * @param <INPUT> Type of tuples which are received by this operator</T>
- * @param <OUTPUT> Type of tuples which are emitted by this operator</T>
- * @since 2.1.0
- */
-public abstract class AbstractEnrichmentOperator<INPUT, OUTPUT> extends BaseOperator
-{
-  /**
-   * Keep lookup data cache for fast access.
-   */
-  private transient CacheManager cacheManager;
-
-  private transient CacheStore primaryCache = new CacheStore();
-
-  private int entryExpiryDurationInMillis = 24 * 60 * 60 * 1000;
-  private int cacheCleanupInMillis = 24 * 60 * 60 * 1000;
-  private int cacheSize = 1024;
-
-  public transient DefaultOutputPort<OUTPUT> output = new DefaultOutputPort<OUTPUT>();
-
-  @InputPortFieldAnnotation(optional = true)
-  public transient DefaultInputPort<INPUT> input = new DefaultInputPort<INPUT>()
-  {
-    @Override public void process(INPUT tuple)
-    {
-      processTuple(tuple);
-    }
-  };
-
-  private EnrichmentBackup store;
-
-  @NotNull
-  protected String lookupFieldsStr;
-
-  protected String includeFieldsStr;
-
-  protected transient List<String> lookupFields = new ArrayList<String>();
-  protected transient List<String> includeFields = new ArrayList<String>();
-
-  protected void processTuple(INPUT tuple) {
-    Object key = getKey(tuple);
-    if(key != null) {
-      Object result = cacheManager.get(key);
-      OUTPUT out = convert(tuple, result);
-      emitTuple(out);
-    }
-  }
-
-  protected abstract Object getKey(INPUT tuple);
-
-  protected void emitTuple(OUTPUT tuple) {
-    output.emit(tuple);
-  }
-
-  /* Add data from cached value to input field */
-  protected abstract OUTPUT convert(INPUT in, Object cached);
-
-  @Override public void setup(Context.OperatorContext context)
-  {
-    super.setup(context);
-
-    cacheManager = new NullValuesCacheManager();
-
-    // set expiration to one day.
-    primaryCache.setEntryExpiryDurationInMillis(entryExpiryDurationInMillis);
-    primaryCache.setCacheCleanupInMillis(cacheCleanupInMillis);
-    primaryCache.setEntryExpiryStrategy(ExpiryType.EXPIRE_AFTER_WRITE);
-    primaryCache.setMaxCacheSize(cacheSize);
-
-    lookupFields = Arrays.asList(lookupFieldsStr.split(","));
-    if (includeFieldsStr != null) {
-      includeFields = Arrays.asList(includeFieldsStr.split(","));
-    }
-
-    try {
-      store.setFields(lookupFields, includeFields);
-
-      cacheManager.setPrimary(primaryCache);
-      cacheManager.setBackup(store);
-      cacheManager.initialize();
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to initialize primary cache", e);
-    }
-  }
-
-  /**
-   * Set the type of backup store for storing and searching data.
-   */
-  public void setStore(EnrichmentBackup store) {
-    this.store = store;
-  }
-
-  public EnrichmentBackup getStore() {
-    return store;
-  }
-
-  public CacheStore getPrimaryCache()
-  {
-    return primaryCache;
-  }
-
-  public String getLookupFieldsStr()
-  {
-    return lookupFieldsStr;
-  }
-
-  /**
-   * Set the lookup fields for quick searching. It would be in comma separated list
-   */
-  public void setLookupFieldsStr(String lookupFieldsStr)
-  {
-    this.lookupFieldsStr = lookupFieldsStr;
-  }
-
-  public String getIncludeFieldsStr()
-  {
-    return includeFieldsStr;
-  }
-
-  /**
-   * Set the list of comma separated fields to be added/replaced to the incoming tuple.
-   */
-  public void setIncludeFieldsStr(String includeFieldsStr)
-  {
-    this.includeFieldsStr = includeFieldsStr;
-  }
-
-  public void setEntryExpiryDurationInMillis(int entryExpiryDurationInMillis)
-  {
-    this.entryExpiryDurationInMillis = entryExpiryDurationInMillis;
-  }
-
-  public void setCacheCleanupInMillis(int cacheCleanupInMillis)
-  {
-    this.cacheCleanupInMillis = cacheCleanupInMillis;
-  }
-
-  public void setCacheSize(int cacheSize)
-  {
-    this.cacheSize = cacheSize;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/EnrichmentBackup.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/EnrichmentBackup.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/EnrichmentBackup.java
deleted file mode 100644
index 9155b13..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/EnrichmentBackup.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/**
- * Copyright (c) 2015 DataTorrent, Inc.
- * All rights reserved.
- */
-
-package com.datatorrent.contrib.enrichment;
-
-import com.datatorrent.lib.db.cache.CacheManager;
-import java.util.List;
-/**
- * @since 3.1.0
- */
-
-public interface EnrichmentBackup extends CacheManager.Backup
-{
-  public void setFields(List<String> lookupFields,List<String> includeFields);
-  public boolean needRefresh();
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/FSLoader.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/FSLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/FSLoader.java
deleted file mode 100644
index 2effed0..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/FSLoader.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Copyright (c) 2015 DataTorrent, Inc.
- * All rights reserved.
- */
-package com.datatorrent.contrib.enrichment;
-
-import com.esotericsoftware.kryo.NotNull;
-import com.google.common.collect.Maps;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.codehaus.jackson.JsonProcessingException;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.ObjectReader;
-import org.codehaus.jackson.type.TypeReference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-/**
- * @since 3.1.0
- */
-
-public class FSLoader extends ReadOnlyBackup
-{
-  @NotNull
-  private String fileName;
-
-  private transient Path filePath;
-  private transient FileSystem fs;
-  private transient boolean connected;
-
-  private transient static final ObjectMapper mapper = new ObjectMapper();
-  private transient static final ObjectReader reader = mapper.reader(new TypeReference<Map<String, Object>>()
-  {
-  });
-  private transient static final Logger logger = LoggerFactory.getLogger(FSLoader.class);
-
-  public String getFileName()
-  {
-    return fileName;
-  }
-
-  public void setFileName(String fileName)
-  {
-    this.fileName = fileName;
-  }
-
-  @Override public Map<Object, Object> loadInitialData()
-  {
-    Map<Object, Object> result = null;
-    FSDataInputStream in = null;
-    BufferedReader bin = null;
-    try {
-      result = Maps.newHashMap();
-      in = fs.open(filePath);
-      bin = new BufferedReader(new InputStreamReader(in));
-      String line;
-      while ((line = bin.readLine()) != null) {
-        try {
-          Map<String, Object> tuple = reader.readValue(line);
-          if(CollectionUtils.isEmpty(includeFields)) {
-            if(includeFields == null)
-              includeFields = new ArrayList<String>();
-            for (Map.Entry<String, Object> e : tuple.entrySet()) {
-              includeFields.add(e.getKey());
-            }
-          }
-          ArrayList<Object> includeTuple = new ArrayList<Object>();
-          for(String s: includeFields) {
-            includeTuple.add(tuple.get(s));
-          }
-          result.put(getKey(tuple), includeTuple);
-        } catch (JsonProcessingException parseExp) {
-          logger.info("Unable to parse line {}", line);
-        }
-      }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    } finally {
-      if(bin != null)
-        IOUtils.closeQuietly(bin);
-      if(in != null)
-        IOUtils.closeQuietly(in);
-      try {
-        fs.close();
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-    logger.debug("loading initial data {}", result.size());
-    return result;
-  }
-
-  private Object getKey(Map<String, Object> tuple)
-  {
-    ArrayList<Object> lst = new ArrayList<Object>();
-    for(String key : lookupFields) {
-      lst.add(tuple.get(key));
-    }
-    return lst;
-  }
-
-  @Override public Object get(Object key)
-  {
-    return null;
-  }
-
-  @Override public List<Object> getAll(List<Object> keys)
-  {
-    return null;
-  }
-
-  @Override public void connect() throws IOException
-  {
-    Configuration conf = new Configuration();
-    this.filePath = new Path(fileName);
-    this.fs = FileSystem.newInstance(filePath.toUri(), conf);
-    if (!fs.isFile(filePath))
-      throw new IOException("Provided path " + fileName + " is not a file");
-    connected = true;
-  }
-
-  @Override public void disconnect() throws IOException
-  {
-    if (fs != null)
-      fs.close();
-  }
-
-  @Override public boolean isConnected()
-  {
-    return connected;
-  }
-
-  @Override
-  public boolean needRefresh() {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/HBaseLoader.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/HBaseLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/HBaseLoader.java
deleted file mode 100755
index 7040a7a..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/HBaseLoader.java
+++ /dev/null
@@ -1,128 +0,0 @@
-package com.datatorrent.contrib.enrichment;
-
-import com.datatorrent.contrib.hbase.HBaseStore;
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * <p>HBaseLoader extends from {@link HBaseStore} uses HBase to connect and implements EnrichmentBackup interface.</p> <br/>
- *
- * Properties:<br>
- * <b>includeFamilys</b>: List of comma separated families and each family corresponds to the group name of column fields in includeFieldsStr. Ex: Family1,Family2<br>
- * <br>
- *
- * @displayName HBaseLoader
- * @tags Loader
- * @since 2.1.0
- */
-public class HBaseLoader extends HBaseStore implements EnrichmentBackup
-{
-  protected transient List<String> includeFields;
-  protected transient List<String> lookupFields;
-  protected transient List<String> includeFamilys;
-
-  protected Object getQueryResult(Object key)
-  {
-    try {
-      Get get = new Get(getRowBytes(((ArrayList)key).get(0)));
-      int idx = 0;
-      for(String f : includeFields) {
-        get.addColumn(Bytes.toBytes(includeFamilys.get(idx++)), Bytes.toBytes(f));
-      }
-      return getTable().get(get);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  protected ArrayList<Object> getDataFrmResult(Object result)
-  {
-    Result res = (Result)result;
-    if (res == null || res.isEmpty())
-      return null;
-    ArrayList<Object> columnInfo = new ArrayList<Object>();
-
-    if(CollectionUtils.isEmpty(includeFields)) {
-      if(includeFields == null) {
-        includeFields = new ArrayList<String>();
-        includeFamilys.clear();
-        includeFamilys = new ArrayList<String>();
-      }
-      for (KeyValue kv: res.raw()) {
-        includeFields.add(new String(kv.getQualifier()));
-        includeFamilys.add(new String(kv.getFamily()));
-      }
-    }
-    for(KeyValue kv : res.raw()) {
-      columnInfo.add(kv.getValue());
-    }
-    return columnInfo;
-  }
-
-  private byte[] getRowBytes(Object key)
-  {
-    return ((String)key).getBytes();
-  }
-
-  @Override public void setFields(List<String> lookupFields,List<String> includeFields)
-  {
-    this.includeFields = includeFields;
-    this.lookupFields = lookupFields;
-  }
-
-  /**
-   * Set the familyStr and would be in the form of comma separated list.
-   */
-  public void setIncludeFamilyStr(String familyStr)
-  {
-    this.includeFamilys = Arrays.asList(familyStr.split(","));
-  }
-
-  @Override public boolean needRefresh()
-  {
-    return false;
-  }
-
-  @Override public Map<Object, Object> loadInitialData()
-  {
-    return null;
-  }
-
-  @Override public Object get(Object key)
-  {
-    return getDataFrmResult(getQueryResult(key));
-  }
-
-  @Override public List<Object> getAll(List<Object> keys)
-  {
-    List<Object> values = Lists.newArrayList();
-    for (Object key : keys) {
-      values.add(get(key));
-    }
-    return values;
-  }
-
-  @Override public void put(Object key, Object value)
-  {
-    throw new RuntimeException("Not supported operation");
-  }
-
-  @Override public void putAll(Map<Object, Object> m)
-  {
-    throw new RuntimeException("Not supported operation");
-  }
-
-  @Override public void remove(Object key)
-  {
-    throw new RuntimeException("Not supported operation");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/JDBCLoader.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/JDBCLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/JDBCLoader.java
deleted file mode 100644
index 3b1a8cf..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/JDBCLoader.java
+++ /dev/null
@@ -1,158 +0,0 @@
-package com.datatorrent.contrib.enrichment;
-
-import com.datatorrent.lib.db.jdbc.JdbcStore;
-import com.google.common.collect.Lists;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.collections.CollectionUtils;
-
-/**
- * <p>HBaseLoader extends from {@link JdbcStore} uses JDBC to connect and implements EnrichmentBackup interface.</p> <br/>
- *
- * Properties:<br>
- * <b>queryStmt</b>: Sql Prepared Statement which needs to be executed<br>
- * <b>tableName</b>: JDBC table name<br>
- * <br>
- *
- * @displayName JDBCLoader
- * @tags Loader
- * @since 2.1.0
- */
-public class JDBCLoader extends JdbcStore implements EnrichmentBackup
-{
-  protected String queryStmt;
-
-  protected String tableName;
-
-  protected transient List<String> includeFields;
-  protected transient List<String> lookupFields;
-
-  protected Object getQueryResult(Object key)
-  {
-    try {
-      PreparedStatement getStatement = getConnection().prepareStatement(queryStmt);
-      ArrayList<Object> keys = (ArrayList<Object>) key;
-      for (int i = 0; i < keys.size(); i++) {
-        getStatement.setObject(i+1, keys.get(i));
-      }
-      return getStatement.executeQuery();
-    } catch (SQLException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  protected ArrayList<Object> getDataFrmResult(Object result) throws RuntimeException
-  {
-    try {
-      ResultSet resultSet = (ResultSet) result;
-      if (resultSet.next()) {
-        ResultSetMetaData rsdata = resultSet.getMetaData();
-        // If the includefields is empty, populate it from ResultSetMetaData
-        if(CollectionUtils.isEmpty(includeFields)) {
-          if(includeFields == null)
-            includeFields = new ArrayList<String>();
-          for (int i = 1; i <= rsdata.getColumnCount(); i++) {
-            includeFields.add(rsdata.getColumnName(i));
-          }
-        }
-        ArrayList<Object> res = new ArrayList<Object>();
-        for(String f : includeFields) {
-          res.add(resultSet.getObject(f));
-        }
-        return res;
-      } else
-        return null;
-    } catch (SQLException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private String generateQueryStmt()
-  {
-    String stmt = "select * from " + tableName + " where ";
-    for (int i = 0; i < lookupFields.size(); i++) {
-      stmt = stmt + lookupFields.get(i) + " = ? ";
-      if(i != lookupFields.size() - 1) {
-        stmt = stmt + " and ";
-      }
-    }
-    logger.info("generateQueryStmt: {}", stmt);
-    return stmt;
-  }
-
-  public String getQueryStmt()
-  {
-    return queryStmt;
-  }
-
-  @Override
-  public boolean needRefresh() {
-    return false;
-  }
-
-  /**
-   * Set the sql Prepared Statement if the enrichment mechanism is query based.
-   */
-  public void setQueryStmt(String queryStmt)
-  {
-    this.queryStmt = queryStmt;
-  }
-
-  public String getTableName()
-  {
-    return tableName;
-  }
-  /**
-   * Set the table name.
-   */
-  public void setTableName(String tableName)
-  {
-    this.tableName = tableName;
-  }
-
-  @Override public void setFields(List<String> lookupFields,List<String> includeFields)
-  {
-    this.includeFields = includeFields;
-    this.lookupFields = lookupFields;
-    if(queryStmt == null)
-      queryStmt = generateQueryStmt();
-  }
-  @Override public Map<Object, Object> loadInitialData()
-  {
-    return null;
-  }
-
-  @Override public Object get(Object key)
-  {
-    return getDataFrmResult(getQueryResult(key));
-  }
-
-  @Override public List<Object> getAll(List<Object> keys)
-  {
-    List<Object> values = Lists.newArrayList();
-    for (Object key : keys) {
-      values.add(get(key));
-    }
-    return values;
-  }
-
-  @Override public void put(Object key, Object value)
-  {
-    throw new RuntimeException("Not supported operation");
-  }
-
-  @Override public void putAll(Map<Object, Object> m)
-  {
-    throw new RuntimeException("Not supported operation");
-  }
-
-  @Override public void remove(Object key)
-  {
-    throw new RuntimeException("Not supported operation");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperator.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperator.java
deleted file mode 100644
index 040b5ae..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package com.datatorrent.contrib.enrichment;
-
-import java.util.ArrayList;
-import java.util.Map;
-
-/**
- *
- * This class takes a HashMap tuple as input and extract the value of the lookupKey configured
- * for this operator. It then does a lookup in file/DB to find matching entry and all key-value pairs
- * specified in the file/DB or based on include fields are added to original tuple.
- *
- * Example
- * The file contains data in json format, one entry per line. during setup entire file is read and
- * kept in memory for quick lookup.
- * If file contains following lines, and operator is configured with lookup key "productId"
- * { "productId": 1, "productCategory": 3 }
- * { "productId": 4, "productCategory": 10 }
- * { "productId": 3, "productCategory": 1 }
- *
- * And input tuple is
- * { amount=10.0, channelId=4, productId=3 }
- *
- * The tuple is modified as below before operator emits it on output port.
- * { amount=10.0, channelId=4, productId=3, productCategory=1 }
- *
- *
- * @displayName MapEnrichment
- * @category Database
- * @tags enrichment, lookup
- *
- * @since 2.1.0
- */
-public class MapEnrichmentOperator extends AbstractEnrichmentOperator<Map<String, Object>, Map<String, Object>>
-{
-  @Override protected Object getKey(Map<String, Object> tuple)
-  {
-    ArrayList<Object> keyList = new ArrayList<Object>();
-    for(String key : lookupFields) {
-      keyList.add(tuple.get(key));
-    }
-    return keyList;
-  }
-
-  @Override protected Map<String, Object> convert(Map<String, Object> in, Object cached)
-  {
-    if (cached == null)
-      return in;
-
-    ArrayList<Object> newAttributes = (ArrayList<Object>)cached;
-    if(newAttributes != null) {
-      for (int i = 0; i < includeFields.size(); i++) {
-        in.put(includeFields.get(i), newAttributes.get(i));
-      }
-    }
-    return in;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/NullValuesCacheManager.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/NullValuesCacheManager.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/NullValuesCacheManager.java
deleted file mode 100644
index f668683..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/NullValuesCacheManager.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Copyright (c) 2015 DataTorrent, Inc.
- * All rights reserved.
- */
-
-package com.datatorrent.contrib.enrichment;
-
-import com.datatorrent.lib.db.cache.CacheManager;
-
-/**
- * @since 3.1.0
- */
-public class NullValuesCacheManager extends CacheManager
-{
-
-  private static final NullObject NULL = new NullObject();
-  @Override
-  public Object get(Object key)
-  {
-    Object primaryVal = primary.get(key);
-    if (primaryVal != null) {
-      if (primaryVal == NULL) {
-        return null;
-      }
-
-      return primaryVal;
-    }
-
-    Object backupVal = backup.get(key);
-    if (backupVal != null) {
-      primary.put(key, backupVal);
-    } else {
-      primary.put(key, NULL);
-    }
-    return backupVal;
-
-  }
-
-  private static class NullObject
-  {
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/POJOEnrichmentOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/POJOEnrichmentOperator.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/POJOEnrichmentOperator.java
deleted file mode 100644
index e707198..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/POJOEnrichmentOperator.java
+++ /dev/null
@@ -1,185 +0,0 @@
-package com.datatorrent.contrib.enrichment;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.lib.util.PojoUtils;
-import com.datatorrent.lib.util.PojoUtils.Getter;
-import com.datatorrent.lib.util.PojoUtils.Setter;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-import com.esotericsoftware.kryo.NotNull;
-import org.apache.commons.lang3.ClassUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- * This class takes a POJO as input and extract the value of the lookupKey configured
- * for this operator. It then does a lookup in file/DB to find matching entry and all key-value pairs
- * specified in the file/DB or based on include fieldMap are added to original tuple.
- *
- * Properties:<br>
- * <b>inputClass</b>: Class to be loaded for the incoming data type<br>
- * <b>outputClass</b>: Class to be loaded for the emitted data type<br>
- * <br>
- *
- * Example
- * The file contains data in json format, one entry per line. during setup entire file is read and
- * kept in memory for quick lookup.
- * If file contains following lines, and operator is configured with lookup key "productId"
- * { "productId": 1, "productCategory": 3 }
- * { "productId": 4, "productCategory": 10 }
- * { "productId": 3, "productCategory": 1 }
- *
- * And input tuple is
- * { amount=10.0, channelId=4, productId=3 }
- *
- * The tuple is modified as below before operator emits it on output port.
- * { amount=10.0, channelId=4, productId=3, productCategory=1 }
- *
- * @displayName BeanEnrichment
- * @category Database
- * @tags enrichment, lookup
- *
- * @since 2.1.0
- */
-public class POJOEnrichmentOperator extends AbstractEnrichmentOperator<Object, Object> {
-
-  private transient static final Logger logger = LoggerFactory.getLogger(POJOEnrichmentOperator.class);
-  protected Class inputClass;
-  protected Class outputClass;
-  private transient List<Getter> getters = new LinkedList<Getter>();
-  private transient List<FieldObjectMap> fieldMap = new LinkedList<FieldObjectMap>();
-  private transient List<Setter> updateSetter = new LinkedList<Setter>();
-
-  @NotNull
-  protected String outputClassStr;
-
-
-  @Override
-  protected Object getKey(Object tuple) {
-    ArrayList<Object> keyList = new ArrayList<Object>();
-    for(Getter g : getters) {
-        keyList.add(g.get(tuple));
-    }
-    return keyList;
-  }
-
-  @Override
-  protected Object convert(Object in, Object cached) {
-    try {
-      Object o = outputClass.newInstance();
-
-      // Copy the fields from input to output
-      for (FieldObjectMap map : fieldMap) {
-        map.set.set(o, map.get.get(in));
-      }
-
-      if (cached == null)
-        return o;
-
-      if(updateSetter.size() == 0 && includeFields.size() != 0) {
-        populateUpdatesFrmIncludeFields();
-      }
-      ArrayList<Object> newAttributes = (ArrayList<Object>)cached;
-      int idx = 0;
-      for(Setter s: updateSetter) {
-        s.set(o, newAttributes.get(idx));
-        idx++;
-      }
-      return o;
-    } catch (InstantiationException e) {
-      throw new RuntimeException(e);
-    } catch (IllegalAccessException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void setup(Context.OperatorContext context) {
-    super.setup(context);
-    populateUpdatesFrmIncludeFields();
-  }
-
-  private void populateGettersFrmLookup()
-  {
-    for (String fName : lookupFields) {
-        Getter f = PojoUtils.createGetter(inputClass, fName, Object.class);
-        getters.add(f);
-    }
-  }
-
-  private void populateGettersFrmInput()
-  {
-    Field[] fields = inputClass.getFields();
-    for (Field f : fields) {
-      Class c = ClassUtils.primitiveToWrapper(f.getType());
-      FieldObjectMap fieldMap = new FieldObjectMap();
-      fieldMap.get = PojoUtils.createGetter(inputClass, f.getName(), c);
-      try {
-        fieldMap.set = PojoUtils.createSetter(outputClass, f.getName(), c);
-      } catch (Throwable e) {
-        throw new RuntimeException("Failed to initialize Output Class for field: " + f.getName(), e);
-      }
-      this.fieldMap.add(fieldMap);
-    }
-  }
-
-  private void populateUpdatesFrmIncludeFields() {
-    if (this.outputClass == null) {
-      logger.debug("Creating output class instance from string: {}", outputClassStr);
-      try {
-        this.outputClass = this.getClass().getClassLoader().loadClass(outputClassStr);
-      } catch (ClassNotFoundException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    for (String fName : includeFields) {
-      try {
-        Field f = outputClass.getField(fName);
-        Class c;
-        if(f.getType().isPrimitive()) {
-          c = ClassUtils.primitiveToWrapper(f.getType());
-        } else {
-           c = f.getType();
-        }
-        try {
-          updateSetter.add(PojoUtils.createSetter(outputClass, f.getName(), c));
-        } catch (Throwable e) {
-          throw new RuntimeException("Failed to initialize Output Class for field: " + f.getName(), e);
-        }
-      } catch (NoSuchFieldException e) {
-        throw new RuntimeException("Cannot find field '" + fName + "' in output class", e);
-      }
-    }
-  }
-
-  public String getOutputClassStr()
-  {
-    return outputClassStr;
-  }
-
-  public void setOutputClassStr(String outputClassStr)
-  {
-    this.outputClassStr = outputClassStr;
-  }
-
-  @Override protected void processTuple(Object tuple)
-  {
-    if (inputClass == null) {
-      inputClass = tuple.getClass();
-      populateGettersFrmLookup();
-      populateGettersFrmInput();
-    }
-    super.processTuple(tuple);
-  }
-
-  private class FieldObjectMap
-  {
-    public Getter get;
-    public Setter set;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/ReadOnlyBackup.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/ReadOnlyBackup.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/ReadOnlyBackup.java
deleted file mode 100644
index 3357704..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/ReadOnlyBackup.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Copyright (c) 2015 DataTorrent, Inc.
- * All rights reserved.
- */
-package com.datatorrent.contrib.enrichment;
-
-import java.util.List;
-import java.util.Map;
-/**
- * @since 3.1.0
- */
-
-public abstract class ReadOnlyBackup implements EnrichmentBackup
-{
-  protected transient List<String> includeFields;
-  protected transient List<String> lookupFields;
-
-  @Override public void put(Object key, Object value)
-  {
-    throw new RuntimeException("Not supported operation");
-  }
-
-  @Override public void putAll(Map<Object, Object> m)
-  {
-    throw new RuntimeException("Not supported operation");
-  }
-
-  @Override public void remove(Object key)
-  {
-    throw new RuntimeException("Not supported operation");
-  }
-
-  @Override public void setFields(List<String> lookupFields,List<String> includeFields)
-  {
-    this.includeFields = includeFields;
-    this.lookupFields = lookupFields;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/package-info.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/package-info.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/package-info.java
deleted file mode 100644
index 7d5b4cd..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/package-info.java
+++ /dev/null
@@ -1 +0,0 @@
-package com.datatorrent.contrib.enrichment;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrich/EmployeeOrder.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrich/EmployeeOrder.java b/contrib/src/test/java/com/datatorrent/contrib/enrich/EmployeeOrder.java
new file mode 100644
index 0000000..6015435
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/enrich/EmployeeOrder.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+public class EmployeeOrder
+{
+  public int OID;
+  public int ID;
+  public double amount;
+  public String NAME;
+  public int AGE;
+  public String ADDRESS;
+  public double SALARY;
+
+  public int getOID()
+  {
+    return OID;
+  }
+
+  public void setOID(int OID)
+  {
+    this.OID = OID;
+  }
+
+  public int getID()
+  {
+    return ID;
+  }
+
+  public void setID(int ID)
+  {
+    this.ID = ID;
+  }
+
+  public int getAGE()
+  {
+    return AGE;
+  }
+
+  public void setAGE(int AGE)
+  {
+    this.AGE = AGE;
+  }
+
+  public String getNAME()
+  {
+    return NAME;
+  }
+
+  public void setNAME(String NAME)
+  {
+    this.NAME = NAME;
+  }
+
+  public double getAmount()
+  {
+    return amount;
+  }
+
+  public void setAmount(double amount)
+  {
+    this.amount = amount;
+  }
+
+  public String getADDRESS()
+  {
+    return ADDRESS;
+  }
+
+  public void setADDRESS(String ADDRESS)
+  {
+    this.ADDRESS = ADDRESS;
+  }
+
+  public double getSALARY()
+  {
+    return SALARY;
+  }
+
+  public void setSALARY(double SALARY)
+  {
+    this.SALARY = SALARY;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "{" +
+        "OID=" + OID +
+        ", ID=" + ID +
+        ", amount=" + amount +
+        ", NAME='" + NAME + '\'' +
+        ", AGE=" + AGE +
+        ", ADDRESS='" + ADDRESS.trim() + '\'' +
+        ", SALARY=" + SALARY +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java
new file mode 100644
index 0000000..f24a13c
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.commons.io.FileUtils;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.google.common.collect.Maps;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+public class FileEnrichmentTest
+{
+
+  @Rule
+  public final TestUtils.TestInfo testInfo = new TestUtils.TestInfo();
+
+  @Test
+  public void testEnrichmentOperator() throws IOException, InterruptedException
+  {
+    URL origUrl = this.getClass().getResource("/productmapping.txt");
+
+    URL fileUrl = new URL(this.getClass().getResource("/").toString() + "productmapping1.txt");
+    FileUtils.deleteQuietly(new File(fileUrl.getPath()));
+    FileUtils.copyFile(new File(origUrl.getPath()), new File(fileUrl.getPath()));
+
+    MapEnricher oper = new MapEnricher();
+    FSLoader store = new FSLoader();
+    store.setFileName(fileUrl.toString());
+    oper.setLookupFields(Arrays.asList("productId"));
+    oper.setIncludeFields(Arrays.asList("productCategory"));
+    oper.setStore(store);
+
+    oper.setup(null);
+
+    /* File contains 6 entries, but operator one entry is duplicate,
+     * so cache should contains only 5 entries after scanning input file.
+     */
+    //Assert.assertEquals("Number of mappings ", 7, oper.cache.size());
+
+    CollectorTestSink<Map<String, Object>> sink = new CollectorTestSink<>();
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    CollectorTestSink<Object> tmp = (CollectorTestSink)sink;
+    oper.output.setSink(tmp);
+
+    oper.activate(null);
+
+    oper.beginWindow(0);
+    Map<String, Object> tuple = Maps.newHashMap();
+    tuple.put("productId", 3);
+    tuple.put("channelId", 4);
+    tuple.put("amount", 10.0);
+
+    Kryo kryo = new Kryo();
+    oper.input.process(kryo.copy(tuple));
+
+    oper.endWindow();
+
+    oper.deactivate();
+
+    /* Number of tuple, emitted */
+    Assert.assertEquals("Number of tuple emitted ", 1, sink.collectedTuples.size());
+    Map<String, Object> emitted = sink.collectedTuples.iterator().next();
+
+    /* The fields present in original event is kept as it is */
+    Assert.assertEquals("Number of fields in emitted tuple", 4, emitted.size());
+    Assert.assertEquals("value of productId is 3", tuple.get("productId"), emitted.get("productId"));
+    Assert.assertEquals("value of channelId is 4", tuple.get("channelId"), emitted.get("channelId"));
+    Assert.assertEquals("value of amount is 10.0", tuple.get("amount"), emitted.get("amount"));
+
+    /* Check if productCategory is added to the event */
+    Assert.assertEquals("productCategory is part of tuple", true, emitted.containsKey("productCategory"));
+    Assert.assertEquals("value of product category is 1", 5, emitted.get("productCategory"));
+    Assert.assertTrue(emitted.get("productCategory") instanceof Integer);
+  }
+}
+