You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by sa...@apache.org on 2016/05/10 09:05:56 UTC

[2/4] incubator-apex-malhar git commit: APEXMALHAR-2023 Adding Enrichment Operator to Malhar

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrich/JDBCLoaderTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrich/JDBCLoaderTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrich/JDBCLoaderTest.java
new file mode 100644
index 0000000..f450736
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/enrich/JDBCLoaderTest.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.netlet.util.DTThrowable;
+
+public class JDBCLoaderTest
+{
+  static final Logger logger = LoggerFactory.getLogger(JDBCLoaderTest.class);
+
+  public static class TestMeta extends TestWatcher
+  {
+    JDBCLoader dbloader;
+    int[] id = {1, 2, 3, 4};
+    String[] name = {"Paul", "Allen", "Teddy", "Mark"};
+    int[] age = {32, 25, 23, 25};
+    String[] address = {"California", "Texas", "Norway", "Rich-Mond"};
+    double[] salary = {20000.00, 15000.00, 20000.00, 65000.00};
+
+    @Override
+    protected void starting(Description description)
+    {
+      try {
+        dbloader = new JDBCLoader();
+        dbloader.setDatabaseDriver("org.hsqldb.jdbcDriver");
+        dbloader.setDatabaseUrl("jdbc:hsqldb:mem:test;sql.syntax_mys=true");
+        dbloader.setTableName("COMPANY");
+
+        dbloader.connect();
+        createTable();
+        insertRecordsInTable();
+      } catch (Throwable e) {
+        DTThrowable.rethrow(e);
+      }
+    }
+
+    private void createTable()
+    {
+      try {
+        Statement stmt = dbloader.getConnection().createStatement();
+
+        String createTable = "CREATE TABLE " + dbloader.getTableName() + " " +
+            "(ID INT PRIMARY KEY, " +
+            "NAME CHAR(50), " +
+            "AGE INT, " +
+            "ADDRESS CHAR(50), " +
+            "SALARY REAL)";
+        logger.debug(createTable);
+        stmt.executeUpdate(createTable);
+
+        logger.debug("Table  created successfully...");
+      } catch (Throwable e) {
+        DTThrowable.rethrow(e);
+      }
+    }
+
+    private void insertRecordsInTable()
+    {
+      try {
+        Statement stmt = dbloader.getConnection().createStatement();
+        String tbName = dbloader.getTableName();
+
+        for (int i = 0; i < id.length; i++) {
+          String sql = "INSERT INTO " + tbName + " (ID,NAME,AGE,ADDRESS,SALARY) " +
+              "VALUES (" + id[i] + ", '" + name[i] + "', " + age[i] + ", '" + address[i] + "', " + salary[i] + " );";
+          stmt.executeUpdate(sql);
+        }
+      } catch (Throwable e) {
+        DTThrowable.rethrow(e);
+      }
+
+    }
+
+    private void cleanTable()
+    {
+      String sql = "DROP TABLE " + dbloader.tableName;
+      try {
+        Statement stmt = dbloader.getConnection().createStatement();
+        stmt.executeUpdate(sql);
+        logger.debug("Table deleted successfully...");
+      } catch (SQLException e) {
+        DTThrowable.rethrow(e);
+      }
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      cleanTable();
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testMysqlDBLookup() throws Exception
+  {
+    CountDownLatch latch = new CountDownLatch(1);
+
+    ArrayList<FieldInfo> lookupKeys = new ArrayList<>();
+    lookupKeys.add(new FieldInfo("ID", "ID", FieldInfo.SupportType.INTEGER));
+    ArrayList<FieldInfo> includeKeys = new ArrayList<>();
+    includeKeys.add(new FieldInfo("NAME", "NAME", FieldInfo.SupportType.STRING));
+    includeKeys.add(new FieldInfo("AGE", "AGE", FieldInfo.SupportType.INTEGER));
+    includeKeys.add(new FieldInfo("ADDRESS", "ADDRESS", FieldInfo.SupportType.STRING));
+
+    testMeta.dbloader.setFieldInfo(lookupKeys, includeKeys);
+
+    latch.await(1000, TimeUnit.MILLISECONDS);
+
+    ArrayList<Object> keys = new ArrayList<>();
+    keys.add(4);
+
+    ArrayList<Object> columnInfo = (ArrayList<Object>)testMeta.dbloader.get(keys);
+
+    Assert.assertEquals("NAME", "Mark", columnInfo.get(0).toString().trim());
+    Assert.assertEquals("AGE", 25, columnInfo.get(1));
+    Assert.assertEquals("ADDRESS", "Rich-Mond", columnInfo.get(2).toString().trim());
+  }
+
+  @Test
+  public void testMysqlDBLookupIncludeAllKeys() throws Exception
+  {
+    CountDownLatch latch = new CountDownLatch(1);
+
+    ArrayList<FieldInfo> lookupKeys = new ArrayList<>();
+    lookupKeys.add(new FieldInfo("ID", "ID", FieldInfo.SupportType.INTEGER));
+
+    ArrayList<FieldInfo> includeKeys = new ArrayList<>();
+    includeKeys.add(new FieldInfo("ID", "ID", FieldInfo.SupportType.INTEGER));
+    includeKeys.add(new FieldInfo("NAME", "NAME", FieldInfo.SupportType.STRING));
+    includeKeys.add(new FieldInfo("AGE", "AGE", FieldInfo.SupportType.INTEGER));
+    includeKeys.add(new FieldInfo("ADDRESS", "ADDRESS", FieldInfo.SupportType.STRING));
+    includeKeys.add(new FieldInfo("SALARY", "SALARY", FieldInfo.SupportType.DOUBLE));
+
+    testMeta.dbloader.setFieldInfo(lookupKeys, includeKeys);
+
+    latch.await(1000, TimeUnit.MILLISECONDS);
+
+    ArrayList<Object> keys = new ArrayList<Object>();
+    keys.add(4);
+
+    ArrayList<Object> columnInfo = (ArrayList<Object>)testMeta.dbloader.get(keys);
+
+    Assert.assertEquals("ID", 4, columnInfo.get(0));
+    Assert.assertEquals("NAME", "Mark", columnInfo.get(1).toString().trim());
+    Assert.assertEquals("AGE", 25, columnInfo.get(2));
+    Assert.assertEquals("ADDRESS", "Rich-Mond", columnInfo.get(3).toString().trim());
+    Assert.assertEquals("SALARY", 65000.0, columnInfo.get(4));
+  }
+
+  @Test
+  public void testMysqlDBQuery() throws Exception
+  {
+    CountDownLatch latch = new CountDownLatch(1);
+
+    testMeta.dbloader
+        .setQueryStmt("Select id, name from " + testMeta.dbloader.getTableName() + " where AGE = ? and ADDRESS = ?");
+
+    latch.await(1000, TimeUnit.MILLISECONDS);
+
+    ArrayList<FieldInfo> includeKeys = new ArrayList<>();
+    includeKeys.add(new FieldInfo("ID", "ID", FieldInfo.SupportType.INTEGER));
+    includeKeys.add(new FieldInfo("NAME", "NAME", FieldInfo.SupportType.STRING));
+
+    testMeta.dbloader.setFieldInfo(null, includeKeys);
+
+    ArrayList<Object> keys = new ArrayList<Object>();
+    keys.add(25);
+    keys.add("Texas");
+
+    ArrayList<Object> columnInfo = (ArrayList<Object>)testMeta.dbloader.get(keys);
+
+    Assert.assertEquals("ID", 2, columnInfo.get(0));
+    Assert.assertEquals("NAME", "Allen", columnInfo.get(1).toString().trim());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrich/MapEnricherTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrich/MapEnricherTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrich/MapEnricherTest.java
new file mode 100644
index 0000000..7323f0d
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/enrich/MapEnricherTest.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Maps;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.TestUtils;
+
+public class MapEnricherTest
+{
+  @Test
+  public void includeAllKeys()
+  {
+    MapEnricher oper = new MapEnricher();
+    oper.setStore(new MemoryStore());
+    oper.setLookupFields(Arrays.asList("In1"));
+    oper.setup(null);
+
+    CollectorTestSink sink = new CollectorTestSink();
+    TestUtils.setSink(oper.output, sink);
+
+    Map<String, Object> inMap = Maps.newHashMap();
+    inMap.put("In1", "Value1");
+    inMap.put("In2", "Value2");
+
+    oper.activate(null);
+    oper.beginWindow(1);
+    oper.input.process(inMap);
+    oper.endWindow();
+    oper.deactivate();
+
+    Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size());
+    Assert.assertEquals("Enrich Tuple: ", "{A=Val_A, B=Val_B, C=Val_C, In2=Value2, In1=Value3}",
+        sink.collectedTuples.get(0).toString());
+  }
+
+  @Test
+  public void includeSelectedKeys()
+  {
+    MapEnricher oper = new MapEnricher();
+    oper.setStore(new MemoryStore());
+    oper.setLookupFields(Arrays.asList("In1"));
+    oper.setIncludeFields(Arrays.asList("A", "B"));
+    oper.setup(null);
+
+    CollectorTestSink sink = new CollectorTestSink();
+    TestUtils.setSink(oper.output, sink);
+
+    Map<String, Object> inMap = Maps.newHashMap();
+    inMap.put("In1", "Value1");
+    inMap.put("In2", "Value2");
+
+    oper.activate(null);
+    oper.beginWindow(1);
+    oper.input.process(inMap);
+    oper.endWindow();
+    oper.deactivate();
+
+    Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size());
+    Assert.assertEquals("Enrich Tuple: ", "{A=Val_A, B=Val_B, In2=Value2, In1=Value1}",
+        sink.collectedTuples.get(0).toString());
+  }
+
+  @Test
+  public void testApplication() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    lma.prepareDAG(new EnrichApplication(), conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.run(10000);// runs for 10 seconds and quits
+  }
+
+  public static class EnrichApplication implements StreamingApplication
+  {
+    @Override
+    public void populateDAG(DAG dag, Configuration configuration)
+    {
+      RandomMapGenerator input = dag.addOperator("Input", RandomMapGenerator.class);
+      MapEnricher enrich = dag.addOperator("Enrich", MapEnricher.class);
+      ConsoleOutputOperator console = dag.addOperator("Console", ConsoleOutputOperator.class);
+      console.setSilent(true);
+
+      List<String> includeFields = new ArrayList<>();
+      includeFields.add("A");
+      includeFields.add("B");
+      List<String> lookupFields = new ArrayList<>();
+      lookupFields.add("In1");
+
+      enrich.setStore(new MemoryStore());
+      enrich.setIncludeFields(includeFields);
+      enrich.setLookupFields(lookupFields);
+
+      dag.addStream("S1", input.output, enrich.input);
+      dag.addStream("S2", enrich.output, console.input);
+    }
+  }
+
+  public static class RandomMapGenerator extends BaseOperator implements InputOperator
+  {
+    private int key = 0;
+
+    public final transient DefaultOutputPort output = new DefaultOutputPort();
+
+    @Override
+    public void emitTuples()
+    {
+      Map<String, String> map = new HashMap<>();
+      map.put("In" + (key + 1), "Value" + (key + 1));
+      map.put("In2", "Value3");
+      output.emit(map);
+    }
+  }
+
+  private static class MemoryStore implements BackendLoader
+  {
+    static Map<String, Map> returnData = Maps.newHashMap();
+    private List<FieldInfo> includeFieldInfo;
+
+    static {
+      Map<String, String> map = Maps.newHashMap();
+      map.put("A", "Val_A");
+      map.put("B", "Val_B");
+      map.put("C", "Val_C");
+      map.put("In1", "Value3");
+      returnData.put("Value1", map);
+
+      map = Maps.newHashMap();
+      map.put("A", "Val_A_1");
+      map.put("B", "Val_B_1");
+      map.put("C", "Val_C");
+      map.put("In1", "Value3");
+      returnData.put("Value2", map);
+    }
+
+    @Override
+    public Map<Object, Object> loadInitialData()
+    {
+      return null;
+    }
+
+    @Override
+    public Object get(Object key)
+    {
+      List<String> keyList = (List<String>)key;
+      Map<String, String> keyValue = returnData.get(keyList.get(0));
+      ArrayList<Object> lst = new ArrayList<Object>();
+
+      if (CollectionUtils.isEmpty(includeFieldInfo)) {
+        if (includeFieldInfo == null) {
+          includeFieldInfo = new ArrayList<>();
+        }
+        for (Map.Entry<String, String> entry : keyValue.entrySet()) {
+          // TODO: Identify the types..
+          includeFieldInfo.add(new FieldInfo(entry.getKey(), entry.getKey(), FieldInfo.SupportType.OBJECT));
+        }
+      }
+
+      for (FieldInfo fieldInfo : includeFieldInfo) {
+        lst.add(keyValue.get(fieldInfo.getColumnName()));
+      }
+
+      return lst;
+    }
+
+    @Override
+    public List<Object> getAll(List<Object> keys)
+    {
+      return null;
+    }
+
+    @Override
+    public void put(Object key, Object value)
+    {
+
+    }
+
+    @Override
+    public void putAll(Map<Object, Object> m)
+    {
+
+    }
+
+    @Override
+    public void remove(Object key)
+    {
+
+    }
+
+    @Override
+    public void connect() throws IOException
+    {
+
+    }
+
+    @Override
+    public void disconnect() throws IOException
+    {
+
+    }
+
+    @Override
+    public boolean isConnected()
+    {
+      return false;
+    }
+
+    @Override
+    public void setFieldInfo(List<FieldInfo> lookupFieldInfo, List<FieldInfo> includeFieldInfo)
+    {
+      this.includeFieldInfo = includeFieldInfo;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrich/Order.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrich/Order.java b/contrib/src/test/java/com/datatorrent/contrib/enrich/Order.java
new file mode 100644
index 0000000..52aa698
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/enrich/Order.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+/**
+ * Input Class for POJO Enricher test.
+ */
+public class Order
+{
+  public int OID;
+  public int ID;
+  public double amount;
+
+  public Order()
+  {
+    // for kryo
+  }
+
+  public Order(int oid, int id, double amount)
+  {
+    this.OID = oid;
+    this.ID = id;
+    this.amount = amount;
+  }
+
+  public int getOID()
+  {
+    return OID;
+  }
+
+  public void setOID(int OID)
+  {
+    this.OID = OID;
+  }
+
+  public int getID()
+  {
+    return ID;
+  }
+
+  public void setID(int ID)
+  {
+    this.ID = ID;
+  }
+
+  public double getAmount()
+  {
+    return amount;
+  }
+
+  public void setAmount(double amount)
+  {
+    this.amount = amount;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrich/POJOEnricherTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrich/POJOEnricherTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrich/POJOEnricherTest.java
new file mode 100644
index 0000000..7a6bc27
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/enrich/POJOEnricherTest.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+public class POJOEnricherTest extends JDBCLoaderTest
+{
+  @Test
+  public void includeSelectedKeys()
+  {
+    POJOEnricher oper = new POJOEnricher();
+    oper.setStore(testMeta.dbloader);
+    oper.setLookupFields(Arrays.asList("ID"));
+    oper.setIncludeFields(Arrays.asList("NAME", "AGE", "ADDRESS"));
+    oper.outputClass = EmployeeOrder.class;
+    oper.inputClass = Order.class;
+    oper.setup(null);
+
+    CollectorTestSink sink = new CollectorTestSink();
+    TestUtils.setSink(oper.output, sink);
+
+    oper.activate(null);
+
+    oper.beginWindow(1);
+    Order tuple = new Order(3, 4, 700);
+    oper.input.process(tuple);
+    oper.endWindow();
+
+    oper.deactivate();
+
+    Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size());
+    Assert.assertEquals("Ouput Tuple: ",
+        "{OID=3, ID=4, amount=700.0, NAME='Mark', AGE=25, ADDRESS='Rich-Mond', SALARY=0.0}",
+        sink.collectedTuples.get(0).toString());
+  }
+
+  @Test
+  public void includeAllKeys()
+  {
+    POJOEnricher oper = new POJOEnricher();
+    oper.setStore(testMeta.dbloader);
+    oper.setLookupFields(Arrays.asList("ID"));
+    oper.setIncludeFields(Arrays.asList("NAME", "AGE", "ADDRESS", "SALARY"));
+    oper.outputClass = EmployeeOrder.class;
+    oper.inputClass = Order.class;
+    oper.setup(null);
+
+    CollectorTestSink sink = new CollectorTestSink();
+    TestUtils.setSink(oper.output, sink);
+
+    oper.activate(null);
+
+    oper.beginWindow(1);
+    Order tuple = new Order(3, 4, 700);
+    oper.input.process(tuple);
+    oper.endWindow();
+
+    oper.deactivate();
+
+    Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size());
+    Assert.assertEquals("Ouput Tuple: ",
+        "{OID=3, ID=4, amount=700.0, NAME='Mark', AGE=25, ADDRESS='Rich-Mond', SALARY=65000.0}",
+        sink.collectedTuples.get(0).toString());
+  }
+
+  @Test
+  public void testApplication() throws Exception
+  {
+    EnrichApplication enrichApplication = new EnrichApplication(testMeta);
+    enrichApplication.setLoader(testMeta.dbloader);
+
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    lma.prepareDAG(enrichApplication, conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.run(10000);// runs for 10 seconds and quits
+  }
+
+  public static class EnrichApplication implements StreamingApplication
+  {
+    private final TestMeta testMeta;
+    BackendLoader loader;
+
+    public EnrichApplication(TestMeta testMeta)
+    {
+      this.testMeta = testMeta;
+    }
+
+    @Override
+    public void populateDAG(DAG dag, Configuration configuration)
+    {
+      RandomPOJOGenerator input = dag.addOperator("Input", RandomPOJOGenerator.class);
+      POJOEnricher enrich = dag.addOperator("Enrich", POJOEnricher.class);
+      EnrichVerifier verify = dag.addOperator("Verify", EnrichVerifier.class);
+      verify.address = testMeta.address;
+      verify.age = testMeta.age;
+      verify.names = testMeta.name;
+      verify.salary = testMeta.salary;
+
+      enrich.setStore(loader);
+      ArrayList<String> lookupFields = new ArrayList<>();
+      lookupFields.add("ID");
+      ArrayList<String> includeFields = new ArrayList<>();
+      includeFields.add("NAME");
+      includeFields.add("AGE");
+      includeFields.add("ADDRESS");
+      includeFields.add("SALARY");
+      enrich.setLookupFields(lookupFields);
+      enrich.setIncludeFields(includeFields);
+
+      dag.getMeta(enrich).getMeta(enrich.input).getAttributes().put(Context.PortContext.TUPLE_CLASS, Order.class);
+      dag.getMeta(enrich).getMeta(enrich.output).getAttributes()
+          .put(Context.PortContext.TUPLE_CLASS, EmployeeOrder.class);
+
+      dag.addStream("S1", input.output, enrich.input);
+      dag.addStream("S2", enrich.output, verify.input);
+    }
+
+    public void setLoader(BackendLoader loader)
+    {
+      this.loader = loader;
+    }
+  }
+
+  public static class RandomPOJOGenerator implements InputOperator
+  {
+    public transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
+    private int idx = 0;
+    private boolean emit = true;
+
+    @Override
+    public void emitTuples()
+    {
+      if (!emit) {
+        return;
+      }
+      idx += idx++ % 4;
+      Order o = new Order(1, idx + 1, 100.00);
+      output.emit(o);
+      if (idx == 3) {
+        emit = false;
+      }
+    }
+
+    @Override
+    public void beginWindow(long l)
+    {
+      emit = true;
+    }
+
+    @Override
+    public void endWindow()
+    {
+
+    }
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+    }
+
+    @Override
+    public void teardown()
+    {
+    }
+  }
+
+  public static class EnrichVerifier extends BaseOperator
+  {
+    private static final Logger logger = LoggerFactory.getLogger(EnrichVerifier.class);
+    String[] names;
+    int[] age;
+    String[] address;
+    double[] salary;
+
+    private transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
+    {
+      @Override
+      public void process(Object o)
+      {
+        Assert.assertTrue(o instanceof EmployeeOrder);
+        EmployeeOrder order = (EmployeeOrder)o;
+        int id = order.getID();
+        Assert.assertTrue(id >= 1 && id <= 4);
+        Assert.assertEquals(1, order.getOID());
+        Assert.assertEquals(100.00, order.getAmount(), 0);
+
+        Assert.assertEquals(names[id - 1], order.getNAME());
+        Assert.assertEquals(age[id - 1], order.getAGE());
+        Assert.assertEquals(address[id - 1], order.getADDRESS());
+        Assert.assertEquals(salary[id - 1], order.getSALARY(), 0);
+      }
+    };
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrichment/BeanEnrichmentOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrichment/BeanEnrichmentOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrichment/BeanEnrichmentOperatorTest.java
deleted file mode 100644
index 308aa82..0000000
--- a/contrib/src/test/java/com/datatorrent/contrib/enrichment/BeanEnrichmentOperatorTest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-package com.datatorrent.contrib.enrichment;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.TestUtils;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class BeanEnrichmentOperatorTest extends JDBCLoaderTest
-{
-  public class Order {
-    public int OID;
-    public int ID;
-    public double amount;
-
-    public Order(int oid, int id, double amount) {
-      this.OID = oid;
-      this.ID = id;
-      this.amount = amount;
-    }
-    public int getOID()
-    {
-      return OID;
-    }
-
-    public void setOID(int OID)
-    {
-      this.OID = OID;
-    }
-
-    public int getID()
-    {
-      return ID;
-    }
-
-    public void setID(int ID)
-    {
-      this.ID = ID;
-    }
-
-    public double getAmount()
-    {
-      return amount;
-    }
-
-    public void setAmount(double amount)
-    {
-      this.amount = amount;
-    }
-  }
-
-
-  @Test
-  public void includeSelectedKeys()
-  {
-    POJOEnrichmentOperator oper = new POJOEnrichmentOperator();
-    oper.setStore(testMeta.dbloader);
-    oper.setLookupFieldsStr("ID");
-    oper.setIncludeFieldsStr("NAME,AGE,ADDRESS");
-    oper.outputClass = EmployeeOrder.class;
-    oper.setup(null);
-
-    CollectorTestSink sink = new CollectorTestSink();
-    TestUtils.setSink(oper.output, sink);
-
-    oper.beginWindow(1);
-    Order tuple = new Order(3, 4, 700);
-    oper.input.process(tuple);
-    oper.endWindow();
-
-    Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size());
-    Assert.assertEquals("Ouput Tuple: ", "{OID=3, ID=4, amount=700.0, NAME='Mark', AGE=25, ADDRESS='Rich-Mond', SALARY=0.0}", sink.collectedTuples.get(0).toString());
-  }
-  @Test
-  public void includeAllKeys()
-  {
-    POJOEnrichmentOperator oper = new POJOEnrichmentOperator();
-    oper.setStore(testMeta.dbloader);
-    oper.setLookupFieldsStr("ID");
-    oper.outputClass = EmployeeOrder.class;
-    oper.setup(null);
-
-    CollectorTestSink sink = new CollectorTestSink();
-    TestUtils.setSink(oper.output, sink);
-
-
-    oper.beginWindow(1);
-    Order tuple = new Order(3, 4, 700);
-    oper.input.process(tuple);
-    oper.endWindow();
-
-    Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size());
-    Assert.assertEquals("Ouput Tuple: ", "{OID=3, ID=4, amount=700.0, NAME='Mark', AGE=25, ADDRESS='Rich-Mond', SALARY=65000.0}", sink.collectedTuples.get(0).toString());
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrichment/EmployeeOrder.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrichment/EmployeeOrder.java b/contrib/src/test/java/com/datatorrent/contrib/enrichment/EmployeeOrder.java
deleted file mode 100644
index 00c9d82..0000000
--- a/contrib/src/test/java/com/datatorrent/contrib/enrichment/EmployeeOrder.java
+++ /dev/null
@@ -1,95 +0,0 @@
-package com.datatorrent.contrib.enrichment;
-
-// This class is needed for Bean Enrichment Operator testing
-public class EmployeeOrder {
-  public int OID;
-  public int ID;
-  public double amount;
-  public String NAME;
-  public int AGE;
-  public String ADDRESS;
-  public double SALARY;
-
-  public int getOID()
-  {
-    return OID;
-  }
-
-  public void setOID(int OID)
-  {
-    this.OID = OID;
-  }
-
-  public int getID()
-  {
-    return ID;
-  }
-
-  public void setID(int ID)
-  {
-    this.ID = ID;
-  }
-
-  public int getAGE()
-  {
-    return AGE;
-  }
-
-  public void setAGE(int AGE)
-  {
-    this.AGE = AGE;
-  }
-
-  public String getNAME()
-  {
-    return NAME;
-  }
-
-  public void setNAME(String NAME)
-  {
-    this.NAME = NAME;
-  }
-
-  public double getAmount()
-  {
-    return amount;
-  }
-
-  public void setAmount(double amount)
-  {
-    this.amount = amount;
-  }
-
-  public String getADDRESS()
-  {
-    return ADDRESS;
-  }
-
-  public void setADDRESS(String ADDRESS)
-  {
-    this.ADDRESS = ADDRESS;
-  }
-
-  public double getSALARY()
-  {
-    return SALARY;
-  }
-
-  public void setSALARY(double SALARY)
-  {
-    this.SALARY = SALARY;
-  }
-
-  @Override public String toString()
-  {
-    return "{" +
-        "OID=" + OID +
-        ", ID=" + ID +
-        ", amount=" + amount +
-        ", NAME='" + NAME + '\'' +
-        ", AGE=" + AGE +
-        ", ADDRESS='" + ADDRESS.trim() + '\'' +
-        ", SALARY=" + SALARY +
-        '}';
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrichment/FileEnrichmentTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrichment/FileEnrichmentTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrichment/FileEnrichmentTest.java
deleted file mode 100644
index 934d73b..0000000
--- a/contrib/src/test/java/com/datatorrent/contrib/enrichment/FileEnrichmentTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-package com.datatorrent.contrib.enrichment;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.TestUtils;
-import com.esotericsoftware.kryo.Kryo;
-import com.google.common.collect.Maps;
-import org.apache.commons.io.FileUtils;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.util.Map;
-
-
-public class FileEnrichmentTest
-{
-
-  @Rule public final TestUtils.TestInfo testInfo = new TestUtils.TestInfo();
-
-  @Test public void testEnrichmentOperator() throws IOException, InterruptedException
-  {
-    URL origUrl = this.getClass().getResource("/productmapping.txt");
-
-    URL fileUrl = new URL(this.getClass().getResource("/").toString() + "productmapping1.txt");
-    FileUtils.deleteQuietly(new File(fileUrl.getPath()));
-    FileUtils.copyFile(new File(origUrl.getPath()), new File(fileUrl.getPath()));
-
-    MapEnrichmentOperator oper = new MapEnrichmentOperator();
-    FSLoader store = new FSLoader();
-    store.setFileName(fileUrl.toString());
-    oper.setLookupFieldsStr("productId");
-    oper.setStore(store);
-
-    oper.setup(null);
-
-    /* File contains 6 entries, but operator one entry is duplicate,
-     * so cache should contains only 5 entries after scanning input file.
-     */
-    //Assert.assertEquals("Number of mappings ", 7, oper.cache.size());
-
-    CollectorTestSink<Map<String, Object>> sink = new CollectorTestSink<Map<String, Object>>();
-    @SuppressWarnings({ "unchecked", "rawtypes" }) CollectorTestSink<Object> tmp = (CollectorTestSink) sink;
-    oper.output.setSink(tmp);
-
-    oper.beginWindow(0);
-    Map<String, Object> tuple = Maps.newHashMap();
-    tuple.put("productId", 3);
-    tuple.put("channelId", 4);
-    tuple.put("amount", 10.0);
-
-    Kryo kryo = new Kryo();
-    oper.input.process(kryo.copy(tuple));
-
-    oper.endWindow();
-
-    /* Number of tuple, emitted */
-    Assert.assertEquals("Number of tuple emitted ", 1, sink.collectedTuples.size());
-    Map<String, Object> emitted = sink.collectedTuples.iterator().next();
-
-    /* The fields present in original event is kept as it is */
-    Assert.assertEquals("Number of fields in emitted tuple", 4, emitted.size());
-    Assert.assertEquals("value of productId is 3", tuple.get("productId"), emitted.get("productId"));
-    Assert.assertEquals("value of channelId is 4", tuple.get("channelId"), emitted.get("channelId"));
-    Assert.assertEquals("value of amount is 10.0", tuple.get("amount"), emitted.get("amount"));
-
-    /* Check if productCategory is added to the event */
-    Assert.assertEquals("productCategory is part of tuple", true, emitted.containsKey("productCategory"));
-    Assert.assertEquals("value of product category is 1", 5, emitted.get("productCategory"));
-
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrichment/HBaseLoaderTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrichment/HBaseLoaderTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrichment/HBaseLoaderTest.java
deleted file mode 100644
index 07be982..0000000
--- a/contrib/src/test/java/com/datatorrent/contrib/enrichment/HBaseLoaderTest.java
+++ /dev/null
@@ -1,162 +0,0 @@
-package com.datatorrent.contrib.enrichment;
-
-import com.datatorrent.netlet.util.DTThrowable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-import org.slf4j.LoggerFactory;
-
-public class HBaseLoaderTest
-{
-  static final org.slf4j.Logger logger = LoggerFactory.getLogger(HBaseLoaderTest.class);
-
-  public static class TestMeta extends TestWatcher
-  {
-
-    HBaseLoader dbloader;
-    @Override
-    protected void starting(Description description)
-    {
-      try {
-        dbloader = new HBaseLoader();
-        Configuration conf = HBaseConfiguration.create();
-        conf.addResource(new Path("file:///home/chaitanya/hbase-site.xml"));
-
-        dbloader.setConfiguration(conf);
-        dbloader.setZookeeperQuorum("localhost");
-        dbloader.setZookeeperClientPort(2181);
-
-        dbloader.setTableName("EMPLOYEE");
-
-        dbloader.connect();
-        createTable();
-        insertRecordsInTable();
-      }
-      catch (Throwable e) {
-        DTThrowable.rethrow(e);
-      }
-    }
-
-    private void createTable()
-    {
-      try {
-        String[] familys = { "personal", "professional" };
-        HBaseAdmin admin = new HBaseAdmin(dbloader.getConfiguration());
-        HTableDescriptor tableDesc = new HTableDescriptor(dbloader.getTableName());
-        for (int i = 0; i < familys.length; i++) {
-          tableDesc.addFamily(new HColumnDescriptor(familys[i]));
-        }
-        admin.createTable(tableDesc);
-
-        logger.debug("Table  created successfully...");
-      }
-      catch (Throwable e) {
-        DTThrowable.rethrow(e);
-      }
-    }
-
-    @SuppressWarnings("deprecation")
-    public void addRecord(String rowKey, String family, String qualifier, String value) throws Exception {
-      try {
-        HTable table = new HTable(dbloader.getConfiguration(), dbloader.getTableName());
-        Put put = new Put(Bytes.toBytes(rowKey));
-        put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes
-            .toBytes(value));
-        table.put(put);
-      } catch (IOException e) {
-        DTThrowable.rethrow(e);
-      }
-    }
-    private void insertRecordsInTable()
-    {
-      try {
-        addRecord("row1", "personal", "name", "raju");
-        addRecord("row1", "personal", "city", "hyderabad");
-        addRecord("row1", "professional", "designation", "manager");
-        addRecord("row1", "professional", "Salary", "50000");
-
-        addRecord("row2", "personal", "name", "ravi");
-        addRecord("row2", "personal", "city", "Chennai");
-        addRecord("row2", "professional", "designation", "SE");
-        addRecord("row2", "professional", "Salary", "30000");
-
-        addRecord("row3", "personal", "name", "rajesh");
-        addRecord("row3", "personal", "city", "Delhi");
-        addRecord("row3", "professional", "designation", "E");
-        addRecord("row3", "professional", "Salary", "10000");
-      }
-      catch (Throwable e) {
-        DTThrowable.rethrow(e);
-      }
-
-    }
-
-    private void cleanTable()
-    {
-      String sql = "delete from  " + dbloader.getTableName();
-      try {
-        HBaseAdmin admin = new HBaseAdmin(dbloader.getConfiguration());
-        admin.disableTable(dbloader.getTableName());
-        admin.deleteTable(dbloader.getTableName());
-      } catch (MasterNotRunningException e) {
-        e.printStackTrace();
-      } catch (ZooKeeperConnectionException e) {
-        e.printStackTrace();
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-    }
-
-    @Override
-    protected void finished(Description description)
-    {
-      cleanTable();
-    }
-  }
-
-  @Rule
-  public TestMeta testMeta = new TestMeta();
-
-  @Test
-  public void testHBaseLookup() throws Exception
-  {
-    CountDownLatch latch = new CountDownLatch(1);
-
-    ArrayList<String> includeKeys = new ArrayList<String>();
-    includeKeys.add("city");
-    includeKeys.add("Salary");
-    ArrayList<String> lookupKeys = new ArrayList<String>();
-    lookupKeys.add("ID");
-    testMeta.dbloader.setFields(lookupKeys, includeKeys);
-
-    String includeFamilyStr = "personal, professional";
-    testMeta.dbloader.setIncludeFamilyStr(includeFamilyStr);
-
-    latch.await(1000, TimeUnit.MILLISECONDS);
-
-    ArrayList<Object> keys = new ArrayList<Object>();
-    keys.add("row2");
-
-    ArrayList<Object> columnInfo = (ArrayList<Object>) testMeta.dbloader.get(keys);
-
-    Assert.assertEquals("CITY", "Chennai", columnInfo.get(0).toString().trim());
-    Assert.assertEquals("Salary", 30000, columnInfo.get(1));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrichment/JDBCLoaderTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrichment/JDBCLoaderTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrichment/JDBCLoaderTest.java
deleted file mode 100644
index 72cfb88..0000000
--- a/contrib/src/test/java/com/datatorrent/contrib/enrichment/JDBCLoaderTest.java
+++ /dev/null
@@ -1,179 +0,0 @@
-package com.datatorrent.contrib.enrichment;
-
-import com.datatorrent.netlet.util.DTThrowable;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import org.slf4j.LoggerFactory;
-
-public class JDBCLoaderTest
-{
-  static final org.slf4j.Logger logger = LoggerFactory.getLogger(JDBCLoaderTest.class);
-
-  public static class TestMeta extends TestWatcher
-  {
-    JDBCLoader dbloader;
-    @Override
-    protected void starting(Description description)
-    {
-        try {
-          dbloader = new JDBCLoader();
-          dbloader.setDatabaseDriver("org.hsqldb.jdbcDriver");
-          dbloader.setDatabaseUrl("jdbc:hsqldb:mem:test;sql.syntax_mys=true");
-          dbloader.setTableName("COMPANY");
-
-          dbloader.connect();
-          createTable();
-          insertRecordsInTable();
-        }
-        catch (Throwable e) {
-            DTThrowable.rethrow(e);
-        }
-    }
-
-    private void createTable()
-    {
-        try {
-            Statement stmt = dbloader.getConnection().createStatement();
-
-            String createTable = "CREATE TABLE IF NOT EXISTS " + dbloader.getTableName() +
-                    "(ID INT PRIMARY KEY     NOT NULL," +
-                    " NAME           TEXT    NOT NULL, " +
-                    " AGE            INT     NOT NULL, " +
-                    " ADDRESS        CHAR(50), " +
-                    " SALARY         REAL)";
-            stmt.executeUpdate(createTable);
-
-            logger.debug("Table  created successfully...");
-        }
-        catch (Throwable e) {
-            DTThrowable.rethrow(e);
-        }
-    }
-
-    private void insertRecordsInTable()
-    {
-      try {
-        Statement stmt = dbloader.getConnection().createStatement();
-        String tbName = dbloader.getTableName();
-
-        String sql = "INSERT INTO " + tbName + " (ID,NAME,AGE,ADDRESS,SALARY) " +
-            "VALUES (1, 'Paul', 32, 'California', 20000.00 );";
-        stmt.executeUpdate(sql);
-
-        sql = "INSERT INTO " + tbName + " (ID,NAME,AGE,ADDRESS,SALARY) " +
-            "VALUES (2, 'Allen', 25, 'Texas', 15000.00 );";
-        stmt.executeUpdate(sql);
-
-        sql = "INSERT INTO " + tbName + " (ID,NAME,AGE,ADDRESS,SALARY) " +
-            "VALUES (3, 'Teddy', 23, 'Norway', 20000.00 );";
-        stmt.executeUpdate(sql);
-
-        sql = "INSERT INTO " + tbName + " (ID,NAME,AGE,ADDRESS,SALARY) " +
-            "VALUES (4, 'Mark', 25, 'Rich-Mond', 65000.00 );";
-        stmt.executeUpdate(sql);
-      }
-      catch (Throwable e) {
-        DTThrowable.rethrow(e);
-      }
-
-    }
-
-    private void cleanTable()
-    {
-        String sql = "delete from  " + dbloader.tableName;
-        try {
-          Statement stmt = dbloader.getConnection().createStatement();
-          stmt.executeUpdate(sql);
-          logger.debug("Table deleted successfully...");
-        } catch (SQLException e) {
-          DTThrowable.rethrow(e);
-        }
-    }
-
-    @Override
-    protected void finished(Description description)
-    {
-      cleanTable();
-    }
-  }
-
-  @Rule
-  public TestMeta testMeta = new TestMeta();
-
-  @Test
-  public void testMysqlDBLookup() throws Exception
-  {
-    CountDownLatch latch = new CountDownLatch(1);
-
-    ArrayList<String> lookupKeys = new ArrayList<String>();
-    lookupKeys.add("ID");
-    ArrayList<String> includeKeys = new ArrayList<String>();
-    includeKeys.add("NAME");
-    includeKeys.add("AGE");
-    includeKeys.add("ADDRESS");
-    testMeta.dbloader.setFields(lookupKeys, includeKeys);
-
-    latch.await(1000, TimeUnit.MILLISECONDS);
-
-    ArrayList<Object> keys = new ArrayList<Object>();
-    keys.add("4");
-
-    ArrayList<Object> columnInfo = (ArrayList<Object>) testMeta.dbloader.get(keys);
-
-    Assert.assertEquals("NAME", "Mark", columnInfo.get(0).toString().trim());
-    Assert.assertEquals("AGE", 25, columnInfo.get(1));
-    Assert.assertEquals("ADDRESS", "Rich-Mond", columnInfo.get(2).toString().trim());
-  }
-
-  @Test
-  public void testMysqlDBLookupIncludeAllKeys() throws Exception
-  {
-    CountDownLatch latch = new CountDownLatch(1);
-
-    ArrayList<String> lookupKeys = new ArrayList<String>();
-    lookupKeys.add("ID");
-    ArrayList<String> includeKeys = new ArrayList<String>();
-    testMeta.dbloader.setFields(lookupKeys, includeKeys);
-
-    latch.await(1000, TimeUnit.MILLISECONDS);
-
-    ArrayList<Object> keys = new ArrayList<Object>();
-    keys.add("4");
-
-    ArrayList<Object> columnInfo = (ArrayList<Object>) testMeta.dbloader.get(keys);
-
-    Assert.assertEquals("ID", 4, columnInfo.get(0));
-    Assert.assertEquals("NAME", "Mark", columnInfo.get(1).toString().trim());
-    Assert.assertEquals("AGE", 25, columnInfo.get(2));
-    Assert.assertEquals("ADDRESS", "Rich-Mond", columnInfo.get(3).toString().trim());
-    Assert.assertEquals("SALARY", 65000.0, columnInfo.get(4));
-  }
-
-  @Test
-  public void testMysqlDBQuery() throws Exception
-  {
-    CountDownLatch latch = new CountDownLatch(1);
-
-    testMeta.dbloader.setQueryStmt("Select id, name from " + testMeta.dbloader.getTableName() + " where AGE = ? and ADDRESS = ?");
-
-    latch.await(1000, TimeUnit.MILLISECONDS);
-
-    ArrayList<Object> keys = new ArrayList<Object>();
-    keys.add("25");
-    keys.add("Texas");
-
-    ArrayList<Object> columnInfo = (ArrayList<Object>) testMeta.dbloader.get(keys);
-
-    Assert.assertEquals("ID", 2, columnInfo.get(0));
-    Assert.assertEquals("NAME", "Allen", columnInfo.get(1).toString().trim());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperatorTest.java
deleted file mode 100644
index 845fe80..0000000
--- a/contrib/src/test/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperatorTest.java
+++ /dev/null
@@ -1,152 +0,0 @@
-package com.datatorrent.contrib.enrichment;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.TestUtils;
-import com.google.common.collect.Maps;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.collections.CollectionUtils;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class MapEnrichmentOperatorTest
-{
-  @Test
-  public void includeAllKeys()
-  {
-    MapEnrichmentOperator oper = new MapEnrichmentOperator();
-    oper.setStore(new MemoryStore());
-    oper.setLookupFieldsStr("In1");
-    oper.setup(null);
-
-    CollectorTestSink sink = new CollectorTestSink();
-    TestUtils.setSink(oper.output, sink);
-
-    Map<String, Object> inMap = Maps.newHashMap();
-    inMap.put("In1", "Value1");
-    inMap.put("In2", "Value2");
-
-    oper.beginWindow(1);
-    oper.input.process(inMap);
-    oper.endWindow();
-
-    Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size());
-    Assert.assertEquals("Enrich Tuple: ", "{A=Val_A, B=Val_B, C=Val_C, In2=Value2, In1=Value3}", sink.collectedTuples.get(0).toString());
-  }
-
-  @Test
-  public void includeSelectedKeys()
-  {
-    MapEnrichmentOperator oper = new MapEnrichmentOperator();
-    oper.setStore(new MemoryStore());
-    oper.setLookupFieldsStr("In1");
-    oper.setIncludeFieldsStr("A,B");
-    oper.setup(null);
-
-    CollectorTestSink sink = new CollectorTestSink();
-    TestUtils.setSink(oper.output, sink);
-
-    Map<String, Object> inMap = Maps.newHashMap();
-    inMap.put("In1", "Value1");
-    inMap.put("In2", "Value2");
-
-    oper.beginWindow(1);
-    oper.input.process(inMap);
-    oper.endWindow();
-
-    Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size());
-    Assert.assertEquals("Enrich Tuple: ", "{A=Val_A, B=Val_B, In2=Value2, In1=Value1}", sink.collectedTuples.get(0).toString());
-  }
-
-  private static class MemoryStore implements EnrichmentBackup
-  {
-    static Map<String, Map> returnData = Maps.newHashMap();
-    private List<String> includeFields;
-    static {
-      Map<String, String> map = Maps.newHashMap();
-      map.put("A", "Val_A");
-      map.put("B", "Val_B");
-      map.put("C", "Val_C");
-      map.put("In1", "Value3");
-      returnData.put("Value1", map);
-
-      map = Maps.newHashMap();
-      map.put("A", "Val_A_1");
-      map.put("B", "Val_B_1");
-      map.put("C", "Val_C");
-      map.put("In1", "Value3");
-      returnData.put("Value2", map);
-    }
-
-    @Override public Map<Object, Object> loadInitialData()
-    {
-      return null;
-    }
-
-    @Override public Object get(Object key)
-    {
-      List<String> keyList = (List<String>)key;
-      Map<String, String> keyValue = returnData.get(keyList.get(0));
-      ArrayList<Object> lst = new ArrayList<Object>();
-      if(CollectionUtils.isEmpty(includeFields)) {
-        if(includeFields == null)
-          includeFields = new ArrayList<String>();
-        for (Map.Entry<String, String> e : keyValue.entrySet()) {
-          includeFields.add(e.getKey());
-        }
-      }
-      for(String field : includeFields) {
-        lst.add(keyValue.get(field));
-      }
-      return lst;
-    }
-
-    @Override public List<Object> getAll(List<Object> keys)
-    {
-      return null;
-    }
-
-    @Override public void put(Object key, Object value)
-    {
-
-    }
-
-    @Override public void putAll(Map<Object, Object> m)
-    {
-
-    }
-
-    @Override public void remove(Object key)
-    {
-
-    }
-
-    @Override public void connect() throws IOException
-    {
-
-    }
-
-    @Override public void disconnect() throws IOException
-    {
-
-    }
-
-    @Override public boolean isConnected()
-    {
-      return false;
-    }
-
-    @Override public void setFields(List<String> lookupFields, List<String> includeFields)
-    {
-      this.includeFields = includeFields;
-
-    }
-
-    @Override
-    public boolean needRefresh() {
-      return false;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/resources/productmapping.txt
----------------------------------------------------------------------
diff --git a/contrib/src/test/resources/productmapping.txt b/contrib/src/test/resources/productmapping.txt
new file mode 100755
index 0000000..ece99ff
--- /dev/null
+++ b/contrib/src/test/resources/productmapping.txt
@@ -0,0 +1,100 @@
+{"productCategory": 5, "productId": 0}
+{"productCategory": 4, "productId": 1}
+{"productCategory": 5, "productId": 2}
+{"productCategory": 5, "productId": 3}
+{"productCategory": 5, "productId": 4}
+{"productCategory": 1, "productId": 5}
+{"productCategory": 2, "productId": 6}
+{"productCategory": 4, "productId": 7}
+{"productCategory": 2, "productId": 8}
+{"productCategory": 3, "productId": 9}
+{"productCategory": 1, "productId": 10}
+{"productCategory": 5, "productId": 11}
+{"productCategory": 5, "productId": 12}
+{"productCategory": 1, "productId": 13}
+{"productCategory": 1, "productId": 14}
+{"productCategory": 2, "productId": 15}
+{"productCategory": 3, "productId": 16}
+{"productCategory": 5, "productId": 17}
+{"productCategory": 2, "productId": 18}
+{"productCategory": 2, "productId": 19}
+{"productCategory": 2, "productId": 20}
+{"productCategory": 3, "productId": 21}
+{"productCategory": 2, "productId": 22}
+{"productCategory": 5, "productId": 23}
+{"productCategory": 4, "productId": 24}
+{"productCategory": 1, "productId": 25}
+{"productCategory": 3, "productId": 26}
+{"productCategory": 3, "productId": 27}
+{"productCategory": 3, "productId": 28}
+{"productCategory": 5, "productId": 29}
+{"productCategory": 2, "productId": 30}
+{"productCategory": 3, "productId": 31}
+{"productCategory": 3, "productId": 32}
+{"productCategory": 3, "productId": 33}
+{"productCategory": 1, "productId": 34}
+{"productCategory": 3, "productId": 35}
+{"productCategory": 2, "productId": 36}
+{"productCategory": 1, "productId": 37}
+{"productCategory": 3, "productId": 38}
+{"productCategory": 2, "productId": 39}
+{"productCategory": 1, "productId": 40}
+{"productCategory": 5, "productId": 41}
+{"productCategory": 3, "productId": 42}
+{"productCategory": 5, "productId": 43}
+{"productCategory": 2, "productId": 44}
+{"productCategory": 4, "productId": 45}
+{"productCategory": 5, "productId": 46}
+{"productCategory": 2, "productId": 47}
+{"productCategory": 3, "productId": 48}
+{"productCategory": 5, "productId": 49}
+{"productCategory": 5, "productId": 50}
+{"productCategory": 4, "productId": 51}
+{"productCategory": 5, "productId": 52}
+{"productCategory": 1, "productId": 53}
+{"productCategory": 5, "productId": 54}
+{"productCategory": 4, "productId": 55}
+{"productCategory": 4, "productId": 56}
+{"productCategory": 2, "productId": 57}
+{"productCategory": 4, "productId": 58}
+{"productCategory": 4, "productId": 59}
+{"productCategory": 4, "productId": 60}
+{"productCategory": 1, "productId": 61}
+{"productCategory": 2, "productId": 62}
+{"productCategory": 3, "productId": 63}
+{"productCategory": 5, "productId": 64}
+{"productCategory": 1, "productId": 65}
+{"productCategory": 5, "productId": 66}
+{"productCategory": 5, "productId": 67}
+{"productCategory": 2, "productId": 68}
+{"productCategory": 3, "productId": 69}
+{"productCategory": 3, "productId": 70}
+{"productCategory": 2, "productId": 71}
+{"productCategory": 3, "productId": 72}
+{"productCategory": 4, "productId": 73}
+{"productCategory": 2, "productId": 74}
+{"productCategory": 3, "productId": 75}
+{"productCategory": 3, "productId": 76}
+{"productCategory": 4, "productId": 77}
+{"productCategory": 5, "productId": 78}
+{"productCategory": 4, "productId": 79}
+{"productCategory": 1, "productId": 80}
+{"productCategory": 1, "productId": 81}
+{"productCategory": 1, "productId": 82}
+{"productCategory": 3, "productId": 83}
+{"productCategory": 1, "productId": 84}
+{"productCategory": 5, "productId": 85}
+{"productCategory": 3, "productId": 86}
+{"productCategory": 4, "productId": 87}
+{"productCategory": 1, "productId": 88}
+{"productCategory": 5, "productId": 89}
+{"productCategory": 3, "productId": 90}
+{"productCategory": 5, "productId": 91}
+{"productCategory": 2, "productId": 92}
+{"productCategory": 2, "productId": 93}
+{"productCategory": 3, "productId": 94}
+{"productCategory": 1, "productId": 95}
+{"productCategory": 1, "productId": 96}
+{"productCategory": 5, "productId": 97}
+{"productCategory": 3, "productId": 98}
+{"productCategory": 5, "productId": 99}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/library/src/main/java/com/datatorrent/lib/util/FieldInfo.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/util/FieldInfo.java b/library/src/main/java/com/datatorrent/lib/util/FieldInfo.java
index e07bd04..d70ca0c 100644
--- a/library/src/main/java/com/datatorrent/lib/util/FieldInfo.java
+++ b/library/src/main/java/com/datatorrent/lib/util/FieldInfo.java
@@ -20,6 +20,8 @@ package com.datatorrent.lib.util;
 
 import javax.validation.constraints.NotNull;
 
+import org.apache.commons.lang3.ClassUtils;
+
 @SuppressWarnings("rawtypes")
 /**
  * @since 3.3.0
@@ -114,7 +116,7 @@ public class FieldInfo
   }
 
   public static enum SupportType {
-    BOOLEAN(Boolean.class), SHORT(Short.class), INTEGER(Integer.class), LONG(Long.class), FLOAT(Float.class), DOUBLE(Double.class), STRING(String.class);
+    BOOLEAN(Boolean.class), SHORT(Short.class), INTEGER(Integer.class), LONG(Long.class), FLOAT(Float.class), DOUBLE(Double.class), STRING(String.class), OBJECT(Object.class);
 
     private Class javaType;
 
@@ -127,6 +129,17 @@ public class FieldInfo
     {
       return javaType;
     }
+
+    public static SupportType getFromJavaType(Class type)
+    {
+      for (SupportType supportType : SupportType.values()) {
+        if (supportType.getJavaType() == ClassUtils.primitiveToWrapper(type)) {
+          return supportType;
+        }
+      }
+
+      return OBJECT;
+    }
   }
 
 }