You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by sa...@apache.org on 2016/05/10 09:05:56 UTC
[2/4] incubator-apex-malhar git commit: APEXMALHAR-2023 Adding
Enrichment Operator to Malhar
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrich/JDBCLoaderTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrich/JDBCLoaderTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrich/JDBCLoaderTest.java
new file mode 100644
index 0000000..f450736
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/enrich/JDBCLoaderTest.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.netlet.util.DTThrowable;
+
+public class JDBCLoaderTest
+{
+ static final Logger logger = LoggerFactory.getLogger(JDBCLoaderTest.class);
+
+ public static class TestMeta extends TestWatcher
+ {
+ JDBCLoader dbloader;
+ int[] id = {1, 2, 3, 4};
+ String[] name = {"Paul", "Allen", "Teddy", "Mark"};
+ int[] age = {32, 25, 23, 25};
+ String[] address = {"California", "Texas", "Norway", "Rich-Mond"};
+ double[] salary = {20000.00, 15000.00, 20000.00, 65000.00};
+
+ @Override
+ protected void starting(Description description)
+ {
+ try {
+ dbloader = new JDBCLoader();
+ dbloader.setDatabaseDriver("org.hsqldb.jdbcDriver");
+ dbloader.setDatabaseUrl("jdbc:hsqldb:mem:test;sql.syntax_mys=true");
+ dbloader.setTableName("COMPANY");
+
+ dbloader.connect();
+ createTable();
+ insertRecordsInTable();
+ } catch (Throwable e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+
+ private void createTable()
+ {
+ try {
+ Statement stmt = dbloader.getConnection().createStatement();
+
+ String createTable = "CREATE TABLE " + dbloader.getTableName() + " " +
+ "(ID INT PRIMARY KEY, " +
+ "NAME CHAR(50), " +
+ "AGE INT, " +
+ "ADDRESS CHAR(50), " +
+ "SALARY REAL)";
+ logger.debug(createTable);
+ stmt.executeUpdate(createTable);
+
+ logger.debug("Table created successfully...");
+ } catch (Throwable e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+
+ private void insertRecordsInTable()
+ {
+ try {
+ Statement stmt = dbloader.getConnection().createStatement();
+ String tbName = dbloader.getTableName();
+
+ for (int i = 0; i < id.length; i++) {
+ String sql = "INSERT INTO " + tbName + " (ID,NAME,AGE,ADDRESS,SALARY) " +
+ "VALUES (" + id[i] + ", '" + name[i] + "', " + age[i] + ", '" + address[i] + "', " + salary[i] + " );";
+ stmt.executeUpdate(sql);
+ }
+ } catch (Throwable e) {
+ DTThrowable.rethrow(e);
+ }
+
+ }
+
+ private void cleanTable()
+ {
+ String sql = "DROP TABLE " + dbloader.tableName;
+ try {
+ Statement stmt = dbloader.getConnection().createStatement();
+ stmt.executeUpdate(sql);
+ logger.debug("Table deleted successfully...");
+ } catch (SQLException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ cleanTable();
+ }
+ }
+
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+ @Test
+ public void testMysqlDBLookup() throws Exception
+ {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ ArrayList<FieldInfo> lookupKeys = new ArrayList<>();
+ lookupKeys.add(new FieldInfo("ID", "ID", FieldInfo.SupportType.INTEGER));
+ ArrayList<FieldInfo> includeKeys = new ArrayList<>();
+ includeKeys.add(new FieldInfo("NAME", "NAME", FieldInfo.SupportType.STRING));
+ includeKeys.add(new FieldInfo("AGE", "AGE", FieldInfo.SupportType.INTEGER));
+ includeKeys.add(new FieldInfo("ADDRESS", "ADDRESS", FieldInfo.SupportType.STRING));
+
+ testMeta.dbloader.setFieldInfo(lookupKeys, includeKeys);
+
+ latch.await(1000, TimeUnit.MILLISECONDS);
+
+ ArrayList<Object> keys = new ArrayList<>();
+ keys.add(4);
+
+ ArrayList<Object> columnInfo = (ArrayList<Object>)testMeta.dbloader.get(keys);
+
+ Assert.assertEquals("NAME", "Mark", columnInfo.get(0).toString().trim());
+ Assert.assertEquals("AGE", 25, columnInfo.get(1));
+ Assert.assertEquals("ADDRESS", "Rich-Mond", columnInfo.get(2).toString().trim());
+ }
+
+ @Test
+ public void testMysqlDBLookupIncludeAllKeys() throws Exception
+ {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ ArrayList<FieldInfo> lookupKeys = new ArrayList<>();
+ lookupKeys.add(new FieldInfo("ID", "ID", FieldInfo.SupportType.INTEGER));
+
+ ArrayList<FieldInfo> includeKeys = new ArrayList<>();
+ includeKeys.add(new FieldInfo("ID", "ID", FieldInfo.SupportType.INTEGER));
+ includeKeys.add(new FieldInfo("NAME", "NAME", FieldInfo.SupportType.STRING));
+ includeKeys.add(new FieldInfo("AGE", "AGE", FieldInfo.SupportType.INTEGER));
+ includeKeys.add(new FieldInfo("ADDRESS", "ADDRESS", FieldInfo.SupportType.STRING));
+ includeKeys.add(new FieldInfo("SALARY", "SALARY", FieldInfo.SupportType.DOUBLE));
+
+ testMeta.dbloader.setFieldInfo(lookupKeys, includeKeys);
+
+ latch.await(1000, TimeUnit.MILLISECONDS);
+
+ ArrayList<Object> keys = new ArrayList<Object>();
+ keys.add(4);
+
+ ArrayList<Object> columnInfo = (ArrayList<Object>)testMeta.dbloader.get(keys);
+
+ Assert.assertEquals("ID", 4, columnInfo.get(0));
+ Assert.assertEquals("NAME", "Mark", columnInfo.get(1).toString().trim());
+ Assert.assertEquals("AGE", 25, columnInfo.get(2));
+ Assert.assertEquals("ADDRESS", "Rich-Mond", columnInfo.get(3).toString().trim());
+ Assert.assertEquals("SALARY", 65000.0, columnInfo.get(4));
+ }
+
+ @Test
+ public void testMysqlDBQuery() throws Exception
+ {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ testMeta.dbloader
+ .setQueryStmt("Select id, name from " + testMeta.dbloader.getTableName() + " where AGE = ? and ADDRESS = ?");
+
+ latch.await(1000, TimeUnit.MILLISECONDS);
+
+ ArrayList<FieldInfo> includeKeys = new ArrayList<>();
+ includeKeys.add(new FieldInfo("ID", "ID", FieldInfo.SupportType.INTEGER));
+ includeKeys.add(new FieldInfo("NAME", "NAME", FieldInfo.SupportType.STRING));
+
+ testMeta.dbloader.setFieldInfo(null, includeKeys);
+
+ ArrayList<Object> keys = new ArrayList<Object>();
+ keys.add(25);
+ keys.add("Texas");
+
+ ArrayList<Object> columnInfo = (ArrayList<Object>)testMeta.dbloader.get(keys);
+
+ Assert.assertEquals("ID", 2, columnInfo.get(0));
+ Assert.assertEquals("NAME", "Allen", columnInfo.get(1).toString().trim());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrich/MapEnricherTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrich/MapEnricherTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrich/MapEnricherTest.java
new file mode 100644
index 0000000..7323f0d
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/enrich/MapEnricherTest.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Maps;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.TestUtils;
+
+public class MapEnricherTest
+{
+ @Test
+ public void includeAllKeys()
+ {
+ MapEnricher oper = new MapEnricher();
+ oper.setStore(new MemoryStore());
+ oper.setLookupFields(Arrays.asList("In1"));
+ oper.setup(null);
+
+ CollectorTestSink sink = new CollectorTestSink();
+ TestUtils.setSink(oper.output, sink);
+
+ Map<String, Object> inMap = Maps.newHashMap();
+ inMap.put("In1", "Value1");
+ inMap.put("In2", "Value2");
+
+ oper.activate(null);
+ oper.beginWindow(1);
+ oper.input.process(inMap);
+ oper.endWindow();
+ oper.deactivate();
+
+ Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size());
+ Assert.assertEquals("Enrich Tuple: ", "{A=Val_A, B=Val_B, C=Val_C, In2=Value2, In1=Value3}",
+ sink.collectedTuples.get(0).toString());
+ }
+
+ @Test
+ public void includeSelectedKeys()
+ {
+ MapEnricher oper = new MapEnricher();
+ oper.setStore(new MemoryStore());
+ oper.setLookupFields(Arrays.asList("In1"));
+ oper.setIncludeFields(Arrays.asList("A", "B"));
+ oper.setup(null);
+
+ CollectorTestSink sink = new CollectorTestSink();
+ TestUtils.setSink(oper.output, sink);
+
+ Map<String, Object> inMap = Maps.newHashMap();
+ inMap.put("In1", "Value1");
+ inMap.put("In2", "Value2");
+
+ oper.activate(null);
+ oper.beginWindow(1);
+ oper.input.process(inMap);
+ oper.endWindow();
+ oper.deactivate();
+
+ Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size());
+ Assert.assertEquals("Enrich Tuple: ", "{A=Val_A, B=Val_B, In2=Value2, In1=Value1}",
+ sink.collectedTuples.get(0).toString());
+ }
+
+ @Test
+ public void testApplication() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ lma.prepareDAG(new EnrichApplication(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.run(10000);// runs for 10 seconds and quits
+ }
+
+ public static class EnrichApplication implements StreamingApplication
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration configuration)
+ {
+ RandomMapGenerator input = dag.addOperator("Input", RandomMapGenerator.class);
+ MapEnricher enrich = dag.addOperator("Enrich", MapEnricher.class);
+ ConsoleOutputOperator console = dag.addOperator("Console", ConsoleOutputOperator.class);
+ console.setSilent(true);
+
+ List<String> includeFields = new ArrayList<>();
+ includeFields.add("A");
+ includeFields.add("B");
+ List<String> lookupFields = new ArrayList<>();
+ lookupFields.add("In1");
+
+ enrich.setStore(new MemoryStore());
+ enrich.setIncludeFields(includeFields);
+ enrich.setLookupFields(lookupFields);
+
+ dag.addStream("S1", input.output, enrich.input);
+ dag.addStream("S2", enrich.output, console.input);
+ }
+ }
+
+ public static class RandomMapGenerator extends BaseOperator implements InputOperator
+ {
+ private int key = 0;
+
+ public final transient DefaultOutputPort output = new DefaultOutputPort();
+
+ @Override
+ public void emitTuples()
+ {
+ Map<String, String> map = new HashMap<>();
+ map.put("In" + (key + 1), "Value" + (key + 1));
+ map.put("In2", "Value3");
+ output.emit(map);
+ }
+ }
+
+ private static class MemoryStore implements BackendLoader
+ {
+ static Map<String, Map> returnData = Maps.newHashMap();
+ private List<FieldInfo> includeFieldInfo;
+
+ static {
+ Map<String, String> map = Maps.newHashMap();
+ map.put("A", "Val_A");
+ map.put("B", "Val_B");
+ map.put("C", "Val_C");
+ map.put("In1", "Value3");
+ returnData.put("Value1", map);
+
+ map = Maps.newHashMap();
+ map.put("A", "Val_A_1");
+ map.put("B", "Val_B_1");
+ map.put("C", "Val_C");
+ map.put("In1", "Value3");
+ returnData.put("Value2", map);
+ }
+
+ @Override
+ public Map<Object, Object> loadInitialData()
+ {
+ return null;
+ }
+
+ @Override
+ public Object get(Object key)
+ {
+ List<String> keyList = (List<String>)key;
+ Map<String, String> keyValue = returnData.get(keyList.get(0));
+ ArrayList<Object> lst = new ArrayList<Object>();
+
+ if (CollectionUtils.isEmpty(includeFieldInfo)) {
+ if (includeFieldInfo == null) {
+ includeFieldInfo = new ArrayList<>();
+ }
+ for (Map.Entry<String, String> entry : keyValue.entrySet()) {
+ // TODO: Identify the types..
+ includeFieldInfo.add(new FieldInfo(entry.getKey(), entry.getKey(), FieldInfo.SupportType.OBJECT));
+ }
+ }
+
+ for (FieldInfo fieldInfo : includeFieldInfo) {
+ lst.add(keyValue.get(fieldInfo.getColumnName()));
+ }
+
+ return lst;
+ }
+
+ @Override
+ public List<Object> getAll(List<Object> keys)
+ {
+ return null;
+ }
+
+ @Override
+ public void put(Object key, Object value)
+ {
+
+ }
+
+ @Override
+ public void putAll(Map<Object, Object> m)
+ {
+
+ }
+
+ @Override
+ public void remove(Object key)
+ {
+
+ }
+
+ @Override
+ public void connect() throws IOException
+ {
+
+ }
+
+ @Override
+ public void disconnect() throws IOException
+ {
+
+ }
+
+ @Override
+ public boolean isConnected()
+ {
+ return false;
+ }
+
+ @Override
+ public void setFieldInfo(List<FieldInfo> lookupFieldInfo, List<FieldInfo> includeFieldInfo)
+ {
+ this.includeFieldInfo = includeFieldInfo;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrich/Order.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrich/Order.java b/contrib/src/test/java/com/datatorrent/contrib/enrich/Order.java
new file mode 100644
index 0000000..52aa698
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/enrich/Order.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+/**
+ * Input Class for POJO Enricher test.
+ */
+public class Order
+{
+ public int OID;
+ public int ID;
+ public double amount;
+
+ public Order()
+ {
+ // for kryo
+ }
+
+ public Order(int oid, int id, double amount)
+ {
+ this.OID = oid;
+ this.ID = id;
+ this.amount = amount;
+ }
+
+ public int getOID()
+ {
+ return OID;
+ }
+
+ public void setOID(int OID)
+ {
+ this.OID = OID;
+ }
+
+ public int getID()
+ {
+ return ID;
+ }
+
+ public void setID(int ID)
+ {
+ this.ID = ID;
+ }
+
+ public double getAmount()
+ {
+ return amount;
+ }
+
+ public void setAmount(double amount)
+ {
+ this.amount = amount;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrich/POJOEnricherTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrich/POJOEnricherTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrich/POJOEnricherTest.java
new file mode 100644
index 0000000..7a6bc27
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/enrich/POJOEnricherTest.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+public class POJOEnricherTest extends JDBCLoaderTest
+{
+ @Test
+ public void includeSelectedKeys()
+ {
+ POJOEnricher oper = new POJOEnricher();
+ oper.setStore(testMeta.dbloader);
+ oper.setLookupFields(Arrays.asList("ID"));
+ oper.setIncludeFields(Arrays.asList("NAME", "AGE", "ADDRESS"));
+ oper.outputClass = EmployeeOrder.class;
+ oper.inputClass = Order.class;
+ oper.setup(null);
+
+ CollectorTestSink sink = new CollectorTestSink();
+ TestUtils.setSink(oper.output, sink);
+
+ oper.activate(null);
+
+ oper.beginWindow(1);
+ Order tuple = new Order(3, 4, 700);
+ oper.input.process(tuple);
+ oper.endWindow();
+
+ oper.deactivate();
+
+ Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size());
+ Assert.assertEquals("Ouput Tuple: ",
+ "{OID=3, ID=4, amount=700.0, NAME='Mark', AGE=25, ADDRESS='Rich-Mond', SALARY=0.0}",
+ sink.collectedTuples.get(0).toString());
+ }
+
+ @Test
+ public void includeAllKeys()
+ {
+ POJOEnricher oper = new POJOEnricher();
+ oper.setStore(testMeta.dbloader);
+ oper.setLookupFields(Arrays.asList("ID"));
+ oper.setIncludeFields(Arrays.asList("NAME", "AGE", "ADDRESS", "SALARY"));
+ oper.outputClass = EmployeeOrder.class;
+ oper.inputClass = Order.class;
+ oper.setup(null);
+
+ CollectorTestSink sink = new CollectorTestSink();
+ TestUtils.setSink(oper.output, sink);
+
+ oper.activate(null);
+
+ oper.beginWindow(1);
+ Order tuple = new Order(3, 4, 700);
+ oper.input.process(tuple);
+ oper.endWindow();
+
+ oper.deactivate();
+
+ Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size());
+ Assert.assertEquals("Ouput Tuple: ",
+ "{OID=3, ID=4, amount=700.0, NAME='Mark', AGE=25, ADDRESS='Rich-Mond', SALARY=65000.0}",
+ sink.collectedTuples.get(0).toString());
+ }
+
+ @Test
+ public void testApplication() throws Exception
+ {
+ EnrichApplication enrichApplication = new EnrichApplication(testMeta);
+ enrichApplication.setLoader(testMeta.dbloader);
+
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ lma.prepareDAG(enrichApplication, conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.run(10000);// runs for 10 seconds and quits
+ }
+
+ public static class EnrichApplication implements StreamingApplication
+ {
+ private final TestMeta testMeta;
+ BackendLoader loader;
+
+ public EnrichApplication(TestMeta testMeta)
+ {
+ this.testMeta = testMeta;
+ }
+
+ @Override
+ public void populateDAG(DAG dag, Configuration configuration)
+ {
+ RandomPOJOGenerator input = dag.addOperator("Input", RandomPOJOGenerator.class);
+ POJOEnricher enrich = dag.addOperator("Enrich", POJOEnricher.class);
+ EnrichVerifier verify = dag.addOperator("Verify", EnrichVerifier.class);
+ verify.address = testMeta.address;
+ verify.age = testMeta.age;
+ verify.names = testMeta.name;
+ verify.salary = testMeta.salary;
+
+ enrich.setStore(loader);
+ ArrayList<String> lookupFields = new ArrayList<>();
+ lookupFields.add("ID");
+ ArrayList<String> includeFields = new ArrayList<>();
+ includeFields.add("NAME");
+ includeFields.add("AGE");
+ includeFields.add("ADDRESS");
+ includeFields.add("SALARY");
+ enrich.setLookupFields(lookupFields);
+ enrich.setIncludeFields(includeFields);
+
+ dag.getMeta(enrich).getMeta(enrich.input).getAttributes().put(Context.PortContext.TUPLE_CLASS, Order.class);
+ dag.getMeta(enrich).getMeta(enrich.output).getAttributes()
+ .put(Context.PortContext.TUPLE_CLASS, EmployeeOrder.class);
+
+ dag.addStream("S1", input.output, enrich.input);
+ dag.addStream("S2", enrich.output, verify.input);
+ }
+
+ public void setLoader(BackendLoader loader)
+ {
+ this.loader = loader;
+ }
+ }
+
+ public static class RandomPOJOGenerator implements InputOperator
+ {
+ public transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
+ private int idx = 0;
+ private boolean emit = true;
+
+ @Override
+ public void emitTuples()
+ {
+ if (!emit) {
+ return;
+ }
+ idx += idx++ % 4;
+ Order o = new Order(1, idx + 1, 100.00);
+ output.emit(o);
+ if (idx == 3) {
+ emit = false;
+ }
+ }
+
+ @Override
+ public void beginWindow(long l)
+ {
+ emit = true;
+ }
+
+ @Override
+ public void endWindow()
+ {
+
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ }
+
+ @Override
+ public void teardown()
+ {
+ }
+ }
+
+ public static class EnrichVerifier extends BaseOperator
+ {
+ private static final Logger logger = LoggerFactory.getLogger(EnrichVerifier.class);
+ String[] names;
+ int[] age;
+ String[] address;
+ double[] salary;
+
+ private transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
+ {
+ @Override
+ public void process(Object o)
+ {
+ Assert.assertTrue(o instanceof EmployeeOrder);
+ EmployeeOrder order = (EmployeeOrder)o;
+ int id = order.getID();
+ Assert.assertTrue(id >= 1 && id <= 4);
+ Assert.assertEquals(1, order.getOID());
+ Assert.assertEquals(100.00, order.getAmount(), 0);
+
+ Assert.assertEquals(names[id - 1], order.getNAME());
+ Assert.assertEquals(age[id - 1], order.getAGE());
+ Assert.assertEquals(address[id - 1], order.getADDRESS());
+ Assert.assertEquals(salary[id - 1], order.getSALARY(), 0);
+ }
+ };
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrichment/BeanEnrichmentOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrichment/BeanEnrichmentOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrichment/BeanEnrichmentOperatorTest.java
deleted file mode 100644
index 308aa82..0000000
--- a/contrib/src/test/java/com/datatorrent/contrib/enrichment/BeanEnrichmentOperatorTest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-package com.datatorrent.contrib.enrichment;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.TestUtils;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class BeanEnrichmentOperatorTest extends JDBCLoaderTest
-{
- public class Order {
- public int OID;
- public int ID;
- public double amount;
-
- public Order(int oid, int id, double amount) {
- this.OID = oid;
- this.ID = id;
- this.amount = amount;
- }
- public int getOID()
- {
- return OID;
- }
-
- public void setOID(int OID)
- {
- this.OID = OID;
- }
-
- public int getID()
- {
- return ID;
- }
-
- public void setID(int ID)
- {
- this.ID = ID;
- }
-
- public double getAmount()
- {
- return amount;
- }
-
- public void setAmount(double amount)
- {
- this.amount = amount;
- }
- }
-
-
- @Test
- public void includeSelectedKeys()
- {
- POJOEnrichmentOperator oper = new POJOEnrichmentOperator();
- oper.setStore(testMeta.dbloader);
- oper.setLookupFieldsStr("ID");
- oper.setIncludeFieldsStr("NAME,AGE,ADDRESS");
- oper.outputClass = EmployeeOrder.class;
- oper.setup(null);
-
- CollectorTestSink sink = new CollectorTestSink();
- TestUtils.setSink(oper.output, sink);
-
- oper.beginWindow(1);
- Order tuple = new Order(3, 4, 700);
- oper.input.process(tuple);
- oper.endWindow();
-
- Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size());
- Assert.assertEquals("Ouput Tuple: ", "{OID=3, ID=4, amount=700.0, NAME='Mark', AGE=25, ADDRESS='Rich-Mond', SALARY=0.0}", sink.collectedTuples.get(0).toString());
- }
- @Test
- public void includeAllKeys()
- {
- POJOEnrichmentOperator oper = new POJOEnrichmentOperator();
- oper.setStore(testMeta.dbloader);
- oper.setLookupFieldsStr("ID");
- oper.outputClass = EmployeeOrder.class;
- oper.setup(null);
-
- CollectorTestSink sink = new CollectorTestSink();
- TestUtils.setSink(oper.output, sink);
-
-
- oper.beginWindow(1);
- Order tuple = new Order(3, 4, 700);
- oper.input.process(tuple);
- oper.endWindow();
-
- Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size());
- Assert.assertEquals("Ouput Tuple: ", "{OID=3, ID=4, amount=700.0, NAME='Mark', AGE=25, ADDRESS='Rich-Mond', SALARY=65000.0}", sink.collectedTuples.get(0).toString());
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrichment/EmployeeOrder.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrichment/EmployeeOrder.java b/contrib/src/test/java/com/datatorrent/contrib/enrichment/EmployeeOrder.java
deleted file mode 100644
index 00c9d82..0000000
--- a/contrib/src/test/java/com/datatorrent/contrib/enrichment/EmployeeOrder.java
+++ /dev/null
@@ -1,95 +0,0 @@
-package com.datatorrent.contrib.enrichment;
-
-// This class is needed for Bean Enrichment Operator testing
-public class EmployeeOrder {
- public int OID;
- public int ID;
- public double amount;
- public String NAME;
- public int AGE;
- public String ADDRESS;
- public double SALARY;
-
- public int getOID()
- {
- return OID;
- }
-
- public void setOID(int OID)
- {
- this.OID = OID;
- }
-
- public int getID()
- {
- return ID;
- }
-
- public void setID(int ID)
- {
- this.ID = ID;
- }
-
- public int getAGE()
- {
- return AGE;
- }
-
- public void setAGE(int AGE)
- {
- this.AGE = AGE;
- }
-
- public String getNAME()
- {
- return NAME;
- }
-
- public void setNAME(String NAME)
- {
- this.NAME = NAME;
- }
-
- public double getAmount()
- {
- return amount;
- }
-
- public void setAmount(double amount)
- {
- this.amount = amount;
- }
-
- public String getADDRESS()
- {
- return ADDRESS;
- }
-
- public void setADDRESS(String ADDRESS)
- {
- this.ADDRESS = ADDRESS;
- }
-
- public double getSALARY()
- {
- return SALARY;
- }
-
- public void setSALARY(double SALARY)
- {
- this.SALARY = SALARY;
- }
-
- @Override public String toString()
- {
- return "{" +
- "OID=" + OID +
- ", ID=" + ID +
- ", amount=" + amount +
- ", NAME='" + NAME + '\'' +
- ", AGE=" + AGE +
- ", ADDRESS='" + ADDRESS.trim() + '\'' +
- ", SALARY=" + SALARY +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrichment/FileEnrichmentTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrichment/FileEnrichmentTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrichment/FileEnrichmentTest.java
deleted file mode 100644
index 934d73b..0000000
--- a/contrib/src/test/java/com/datatorrent/contrib/enrichment/FileEnrichmentTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-package com.datatorrent.contrib.enrichment;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.TestUtils;
-import com.esotericsoftware.kryo.Kryo;
-import com.google.common.collect.Maps;
-import org.apache.commons.io.FileUtils;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.util.Map;
-
-
-public class FileEnrichmentTest
-{
-
- @Rule public final TestUtils.TestInfo testInfo = new TestUtils.TestInfo();
-
- @Test public void testEnrichmentOperator() throws IOException, InterruptedException
- {
- URL origUrl = this.getClass().getResource("/productmapping.txt");
-
- URL fileUrl = new URL(this.getClass().getResource("/").toString() + "productmapping1.txt");
- FileUtils.deleteQuietly(new File(fileUrl.getPath()));
- FileUtils.copyFile(new File(origUrl.getPath()), new File(fileUrl.getPath()));
-
- MapEnrichmentOperator oper = new MapEnrichmentOperator();
- FSLoader store = new FSLoader();
- store.setFileName(fileUrl.toString());
- oper.setLookupFieldsStr("productId");
- oper.setStore(store);
-
- oper.setup(null);
-
- /* File contains 6 entries, but operator one entry is duplicate,
- * so cache should contains only 5 entries after scanning input file.
- */
- //Assert.assertEquals("Number of mappings ", 7, oper.cache.size());
-
- CollectorTestSink<Map<String, Object>> sink = new CollectorTestSink<Map<String, Object>>();
- @SuppressWarnings({ "unchecked", "rawtypes" }) CollectorTestSink<Object> tmp = (CollectorTestSink) sink;
- oper.output.setSink(tmp);
-
- oper.beginWindow(0);
- Map<String, Object> tuple = Maps.newHashMap();
- tuple.put("productId", 3);
- tuple.put("channelId", 4);
- tuple.put("amount", 10.0);
-
- Kryo kryo = new Kryo();
- oper.input.process(kryo.copy(tuple));
-
- oper.endWindow();
-
- /* Number of tuple, emitted */
- Assert.assertEquals("Number of tuple emitted ", 1, sink.collectedTuples.size());
- Map<String, Object> emitted = sink.collectedTuples.iterator().next();
-
- /* The fields present in original event is kept as it is */
- Assert.assertEquals("Number of fields in emitted tuple", 4, emitted.size());
- Assert.assertEquals("value of productId is 3", tuple.get("productId"), emitted.get("productId"));
- Assert.assertEquals("value of channelId is 4", tuple.get("channelId"), emitted.get("channelId"));
- Assert.assertEquals("value of amount is 10.0", tuple.get("amount"), emitted.get("amount"));
-
- /* Check if productCategory is added to the event */
- Assert.assertEquals("productCategory is part of tuple", true, emitted.containsKey("productCategory"));
- Assert.assertEquals("value of product category is 1", 5, emitted.get("productCategory"));
-
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrichment/HBaseLoaderTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrichment/HBaseLoaderTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrichment/HBaseLoaderTest.java
deleted file mode 100644
index 07be982..0000000
--- a/contrib/src/test/java/com/datatorrent/contrib/enrichment/HBaseLoaderTest.java
+++ /dev/null
@@ -1,162 +0,0 @@
-package com.datatorrent.contrib.enrichment;
-
-import com.datatorrent.netlet.util.DTThrowable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-import org.slf4j.LoggerFactory;
-
-public class HBaseLoaderTest
-{
- static final org.slf4j.Logger logger = LoggerFactory.getLogger(HBaseLoaderTest.class);
-
- public static class TestMeta extends TestWatcher
- {
-
- HBaseLoader dbloader;
- @Override
- protected void starting(Description description)
- {
- try {
- dbloader = new HBaseLoader();
- Configuration conf = HBaseConfiguration.create();
- conf.addResource(new Path("file:///home/chaitanya/hbase-site.xml"));
-
- dbloader.setConfiguration(conf);
- dbloader.setZookeeperQuorum("localhost");
- dbloader.setZookeeperClientPort(2181);
-
- dbloader.setTableName("EMPLOYEE");
-
- dbloader.connect();
- createTable();
- insertRecordsInTable();
- }
- catch (Throwable e) {
- DTThrowable.rethrow(e);
- }
- }
-
- private void createTable()
- {
- try {
- String[] familys = { "personal", "professional" };
- HBaseAdmin admin = new HBaseAdmin(dbloader.getConfiguration());
- HTableDescriptor tableDesc = new HTableDescriptor(dbloader.getTableName());
- for (int i = 0; i < familys.length; i++) {
- tableDesc.addFamily(new HColumnDescriptor(familys[i]));
- }
- admin.createTable(tableDesc);
-
- logger.debug("Table created successfully...");
- }
- catch (Throwable e) {
- DTThrowable.rethrow(e);
- }
- }
-
- @SuppressWarnings("deprecation")
- public void addRecord(String rowKey, String family, String qualifier, String value) throws Exception {
- try {
- HTable table = new HTable(dbloader.getConfiguration(), dbloader.getTableName());
- Put put = new Put(Bytes.toBytes(rowKey));
- put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes
- .toBytes(value));
- table.put(put);
- } catch (IOException e) {
- DTThrowable.rethrow(e);
- }
- }
- private void insertRecordsInTable()
- {
- try {
- addRecord("row1", "personal", "name", "raju");
- addRecord("row1", "personal", "city", "hyderabad");
- addRecord("row1", "professional", "designation", "manager");
- addRecord("row1", "professional", "Salary", "50000");
-
- addRecord("row2", "personal", "name", "ravi");
- addRecord("row2", "personal", "city", "Chennai");
- addRecord("row2", "professional", "designation", "SE");
- addRecord("row2", "professional", "Salary", "30000");
-
- addRecord("row3", "personal", "name", "rajesh");
- addRecord("row3", "personal", "city", "Delhi");
- addRecord("row3", "professional", "designation", "E");
- addRecord("row3", "professional", "Salary", "10000");
- }
- catch (Throwable e) {
- DTThrowable.rethrow(e);
- }
-
- }
-
- private void cleanTable()
- {
- String sql = "delete from " + dbloader.getTableName();
- try {
- HBaseAdmin admin = new HBaseAdmin(dbloader.getConfiguration());
- admin.disableTable(dbloader.getTableName());
- admin.deleteTable(dbloader.getTableName());
- } catch (MasterNotRunningException e) {
- e.printStackTrace();
- } catch (ZooKeeperConnectionException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- protected void finished(Description description)
- {
- cleanTable();
- }
- }
-
- @Rule
- public TestMeta testMeta = new TestMeta();
-
- @Test
- public void testHBaseLookup() throws Exception
- {
- CountDownLatch latch = new CountDownLatch(1);
-
- ArrayList<String> includeKeys = new ArrayList<String>();
- includeKeys.add("city");
- includeKeys.add("Salary");
- ArrayList<String> lookupKeys = new ArrayList<String>();
- lookupKeys.add("ID");
- testMeta.dbloader.setFields(lookupKeys, includeKeys);
-
- String includeFamilyStr = "personal, professional";
- testMeta.dbloader.setIncludeFamilyStr(includeFamilyStr);
-
- latch.await(1000, TimeUnit.MILLISECONDS);
-
- ArrayList<Object> keys = new ArrayList<Object>();
- keys.add("row2");
-
- ArrayList<Object> columnInfo = (ArrayList<Object>) testMeta.dbloader.get(keys);
-
- Assert.assertEquals("CITY", "Chennai", columnInfo.get(0).toString().trim());
- Assert.assertEquals("Salary", 30000, columnInfo.get(1));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrichment/JDBCLoaderTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrichment/JDBCLoaderTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrichment/JDBCLoaderTest.java
deleted file mode 100644
index 72cfb88..0000000
--- a/contrib/src/test/java/com/datatorrent/contrib/enrichment/JDBCLoaderTest.java
+++ /dev/null
@@ -1,179 +0,0 @@
-package com.datatorrent.contrib.enrichment;
-
-import com.datatorrent.netlet.util.DTThrowable;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import org.slf4j.LoggerFactory;
-
-public class JDBCLoaderTest
-{
- static final org.slf4j.Logger logger = LoggerFactory.getLogger(JDBCLoaderTest.class);
-
- public static class TestMeta extends TestWatcher
- {
- JDBCLoader dbloader;
- @Override
- protected void starting(Description description)
- {
- try {
- dbloader = new JDBCLoader();
- dbloader.setDatabaseDriver("org.hsqldb.jdbcDriver");
- dbloader.setDatabaseUrl("jdbc:hsqldb:mem:test;sql.syntax_mys=true");
- dbloader.setTableName("COMPANY");
-
- dbloader.connect();
- createTable();
- insertRecordsInTable();
- }
- catch (Throwable e) {
- DTThrowable.rethrow(e);
- }
- }
-
- private void createTable()
- {
- try {
- Statement stmt = dbloader.getConnection().createStatement();
-
- String createTable = "CREATE TABLE IF NOT EXISTS " + dbloader.getTableName() +
- "(ID INT PRIMARY KEY NOT NULL," +
- " NAME TEXT NOT NULL, " +
- " AGE INT NOT NULL, " +
- " ADDRESS CHAR(50), " +
- " SALARY REAL)";
- stmt.executeUpdate(createTable);
-
- logger.debug("Table created successfully...");
- }
- catch (Throwable e) {
- DTThrowable.rethrow(e);
- }
- }
-
- private void insertRecordsInTable()
- {
- try {
- Statement stmt = dbloader.getConnection().createStatement();
- String tbName = dbloader.getTableName();
-
- String sql = "INSERT INTO " + tbName + " (ID,NAME,AGE,ADDRESS,SALARY) " +
- "VALUES (1, 'Paul', 32, 'California', 20000.00 );";
- stmt.executeUpdate(sql);
-
- sql = "INSERT INTO " + tbName + " (ID,NAME,AGE,ADDRESS,SALARY) " +
- "VALUES (2, 'Allen', 25, 'Texas', 15000.00 );";
- stmt.executeUpdate(sql);
-
- sql = "INSERT INTO " + tbName + " (ID,NAME,AGE,ADDRESS,SALARY) " +
- "VALUES (3, 'Teddy', 23, 'Norway', 20000.00 );";
- stmt.executeUpdate(sql);
-
- sql = "INSERT INTO " + tbName + " (ID,NAME,AGE,ADDRESS,SALARY) " +
- "VALUES (4, 'Mark', 25, 'Rich-Mond', 65000.00 );";
- stmt.executeUpdate(sql);
- }
- catch (Throwable e) {
- DTThrowable.rethrow(e);
- }
-
- }
-
- private void cleanTable()
- {
- String sql = "delete from " + dbloader.tableName;
- try {
- Statement stmt = dbloader.getConnection().createStatement();
- stmt.executeUpdate(sql);
- logger.debug("Table deleted successfully...");
- } catch (SQLException e) {
- DTThrowable.rethrow(e);
- }
- }
-
- @Override
- protected void finished(Description description)
- {
- cleanTable();
- }
- }
-
- @Rule
- public TestMeta testMeta = new TestMeta();
-
- @Test
- public void testMysqlDBLookup() throws Exception
- {
- CountDownLatch latch = new CountDownLatch(1);
-
- ArrayList<String> lookupKeys = new ArrayList<String>();
- lookupKeys.add("ID");
- ArrayList<String> includeKeys = new ArrayList<String>();
- includeKeys.add("NAME");
- includeKeys.add("AGE");
- includeKeys.add("ADDRESS");
- testMeta.dbloader.setFields(lookupKeys, includeKeys);
-
- latch.await(1000, TimeUnit.MILLISECONDS);
-
- ArrayList<Object> keys = new ArrayList<Object>();
- keys.add("4");
-
- ArrayList<Object> columnInfo = (ArrayList<Object>) testMeta.dbloader.get(keys);
-
- Assert.assertEquals("NAME", "Mark", columnInfo.get(0).toString().trim());
- Assert.assertEquals("AGE", 25, columnInfo.get(1));
- Assert.assertEquals("ADDRESS", "Rich-Mond", columnInfo.get(2).toString().trim());
- }
-
- @Test
- public void testMysqlDBLookupIncludeAllKeys() throws Exception
- {
- CountDownLatch latch = new CountDownLatch(1);
-
- ArrayList<String> lookupKeys = new ArrayList<String>();
- lookupKeys.add("ID");
- ArrayList<String> includeKeys = new ArrayList<String>();
- testMeta.dbloader.setFields(lookupKeys, includeKeys);
-
- latch.await(1000, TimeUnit.MILLISECONDS);
-
- ArrayList<Object> keys = new ArrayList<Object>();
- keys.add("4");
-
- ArrayList<Object> columnInfo = (ArrayList<Object>) testMeta.dbloader.get(keys);
-
- Assert.assertEquals("ID", 4, columnInfo.get(0));
- Assert.assertEquals("NAME", "Mark", columnInfo.get(1).toString().trim());
- Assert.assertEquals("AGE", 25, columnInfo.get(2));
- Assert.assertEquals("ADDRESS", "Rich-Mond", columnInfo.get(3).toString().trim());
- Assert.assertEquals("SALARY", 65000.0, columnInfo.get(4));
- }
-
- @Test
- public void testMysqlDBQuery() throws Exception
- {
- CountDownLatch latch = new CountDownLatch(1);
-
- testMeta.dbloader.setQueryStmt("Select id, name from " + testMeta.dbloader.getTableName() + " where AGE = ? and ADDRESS = ?");
-
- latch.await(1000, TimeUnit.MILLISECONDS);
-
- ArrayList<Object> keys = new ArrayList<Object>();
- keys.add("25");
- keys.add("Texas");
-
- ArrayList<Object> columnInfo = (ArrayList<Object>) testMeta.dbloader.get(keys);
-
- Assert.assertEquals("ID", 2, columnInfo.get(0));
- Assert.assertEquals("NAME", "Allen", columnInfo.get(1).toString().trim());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperatorTest.java
deleted file mode 100644
index 845fe80..0000000
--- a/contrib/src/test/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperatorTest.java
+++ /dev/null
@@ -1,152 +0,0 @@
-package com.datatorrent.contrib.enrichment;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.TestUtils;
-import com.google.common.collect.Maps;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.collections.CollectionUtils;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class MapEnrichmentOperatorTest
-{
- @Test
- public void includeAllKeys()
- {
- MapEnrichmentOperator oper = new MapEnrichmentOperator();
- oper.setStore(new MemoryStore());
- oper.setLookupFieldsStr("In1");
- oper.setup(null);
-
- CollectorTestSink sink = new CollectorTestSink();
- TestUtils.setSink(oper.output, sink);
-
- Map<String, Object> inMap = Maps.newHashMap();
- inMap.put("In1", "Value1");
- inMap.put("In2", "Value2");
-
- oper.beginWindow(1);
- oper.input.process(inMap);
- oper.endWindow();
-
- Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size());
- Assert.assertEquals("Enrich Tuple: ", "{A=Val_A, B=Val_B, C=Val_C, In2=Value2, In1=Value3}", sink.collectedTuples.get(0).toString());
- }
-
- @Test
- public void includeSelectedKeys()
- {
- MapEnrichmentOperator oper = new MapEnrichmentOperator();
- oper.setStore(new MemoryStore());
- oper.setLookupFieldsStr("In1");
- oper.setIncludeFieldsStr("A,B");
- oper.setup(null);
-
- CollectorTestSink sink = new CollectorTestSink();
- TestUtils.setSink(oper.output, sink);
-
- Map<String, Object> inMap = Maps.newHashMap();
- inMap.put("In1", "Value1");
- inMap.put("In2", "Value2");
-
- oper.beginWindow(1);
- oper.input.process(inMap);
- oper.endWindow();
-
- Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size());
- Assert.assertEquals("Enrich Tuple: ", "{A=Val_A, B=Val_B, In2=Value2, In1=Value1}", sink.collectedTuples.get(0).toString());
- }
-
- private static class MemoryStore implements EnrichmentBackup
- {
- static Map<String, Map> returnData = Maps.newHashMap();
- private List<String> includeFields;
- static {
- Map<String, String> map = Maps.newHashMap();
- map.put("A", "Val_A");
- map.put("B", "Val_B");
- map.put("C", "Val_C");
- map.put("In1", "Value3");
- returnData.put("Value1", map);
-
- map = Maps.newHashMap();
- map.put("A", "Val_A_1");
- map.put("B", "Val_B_1");
- map.put("C", "Val_C");
- map.put("In1", "Value3");
- returnData.put("Value2", map);
- }
-
- @Override public Map<Object, Object> loadInitialData()
- {
- return null;
- }
-
- @Override public Object get(Object key)
- {
- List<String> keyList = (List<String>)key;
- Map<String, String> keyValue = returnData.get(keyList.get(0));
- ArrayList<Object> lst = new ArrayList<Object>();
- if(CollectionUtils.isEmpty(includeFields)) {
- if(includeFields == null)
- includeFields = new ArrayList<String>();
- for (Map.Entry<String, String> e : keyValue.entrySet()) {
- includeFields.add(e.getKey());
- }
- }
- for(String field : includeFields) {
- lst.add(keyValue.get(field));
- }
- return lst;
- }
-
- @Override public List<Object> getAll(List<Object> keys)
- {
- return null;
- }
-
- @Override public void put(Object key, Object value)
- {
-
- }
-
- @Override public void putAll(Map<Object, Object> m)
- {
-
- }
-
- @Override public void remove(Object key)
- {
-
- }
-
- @Override public void connect() throws IOException
- {
-
- }
-
- @Override public void disconnect() throws IOException
- {
-
- }
-
- @Override public boolean isConnected()
- {
- return false;
- }
-
- @Override public void setFields(List<String> lookupFields, List<String> includeFields)
- {
- this.includeFields = includeFields;
-
- }
-
- @Override
- public boolean needRefresh() {
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/resources/productmapping.txt
----------------------------------------------------------------------
diff --git a/contrib/src/test/resources/productmapping.txt b/contrib/src/test/resources/productmapping.txt
new file mode 100755
index 0000000..ece99ff
--- /dev/null
+++ b/contrib/src/test/resources/productmapping.txt
@@ -0,0 +1,100 @@
+{"productCategory": 5, "productId": 0}
+{"productCategory": 4, "productId": 1}
+{"productCategory": 5, "productId": 2}
+{"productCategory": 5, "productId": 3}
+{"productCategory": 5, "productId": 4}
+{"productCategory": 1, "productId": 5}
+{"productCategory": 2, "productId": 6}
+{"productCategory": 4, "productId": 7}
+{"productCategory": 2, "productId": 8}
+{"productCategory": 3, "productId": 9}
+{"productCategory": 1, "productId": 10}
+{"productCategory": 5, "productId": 11}
+{"productCategory": 5, "productId": 12}
+{"productCategory": 1, "productId": 13}
+{"productCategory": 1, "productId": 14}
+{"productCategory": 2, "productId": 15}
+{"productCategory": 3, "productId": 16}
+{"productCategory": 5, "productId": 17}
+{"productCategory": 2, "productId": 18}
+{"productCategory": 2, "productId": 19}
+{"productCategory": 2, "productId": 20}
+{"productCategory": 3, "productId": 21}
+{"productCategory": 2, "productId": 22}
+{"productCategory": 5, "productId": 23}
+{"productCategory": 4, "productId": 24}
+{"productCategory": 1, "productId": 25}
+{"productCategory": 3, "productId": 26}
+{"productCategory": 3, "productId": 27}
+{"productCategory": 3, "productId": 28}
+{"productCategory": 5, "productId": 29}
+{"productCategory": 2, "productId": 30}
+{"productCategory": 3, "productId": 31}
+{"productCategory": 3, "productId": 32}
+{"productCategory": 3, "productId": 33}
+{"productCategory": 1, "productId": 34}
+{"productCategory": 3, "productId": 35}
+{"productCategory": 2, "productId": 36}
+{"productCategory": 1, "productId": 37}
+{"productCategory": 3, "productId": 38}
+{"productCategory": 2, "productId": 39}
+{"productCategory": 1, "productId": 40}
+{"productCategory": 5, "productId": 41}
+{"productCategory": 3, "productId": 42}
+{"productCategory": 5, "productId": 43}
+{"productCategory": 2, "productId": 44}
+{"productCategory": 4, "productId": 45}
+{"productCategory": 5, "productId": 46}
+{"productCategory": 2, "productId": 47}
+{"productCategory": 3, "productId": 48}
+{"productCategory": 5, "productId": 49}
+{"productCategory": 5, "productId": 50}
+{"productCategory": 4, "productId": 51}
+{"productCategory": 5, "productId": 52}
+{"productCategory": 1, "productId": 53}
+{"productCategory": 5, "productId": 54}
+{"productCategory": 4, "productId": 55}
+{"productCategory": 4, "productId": 56}
+{"productCategory": 2, "productId": 57}
+{"productCategory": 4, "productId": 58}
+{"productCategory": 4, "productId": 59}
+{"productCategory": 4, "productId": 60}
+{"productCategory": 1, "productId": 61}
+{"productCategory": 2, "productId": 62}
+{"productCategory": 3, "productId": 63}
+{"productCategory": 5, "productId": 64}
+{"productCategory": 1, "productId": 65}
+{"productCategory": 5, "productId": 66}
+{"productCategory": 5, "productId": 67}
+{"productCategory": 2, "productId": 68}
+{"productCategory": 3, "productId": 69}
+{"productCategory": 3, "productId": 70}
+{"productCategory": 2, "productId": 71}
+{"productCategory": 3, "productId": 72}
+{"productCategory": 4, "productId": 73}
+{"productCategory": 2, "productId": 74}
+{"productCategory": 3, "productId": 75}
+{"productCategory": 3, "productId": 76}
+{"productCategory": 4, "productId": 77}
+{"productCategory": 5, "productId": 78}
+{"productCategory": 4, "productId": 79}
+{"productCategory": 1, "productId": 80}
+{"productCategory": 1, "productId": 81}
+{"productCategory": 1, "productId": 82}
+{"productCategory": 3, "productId": 83}
+{"productCategory": 1, "productId": 84}
+{"productCategory": 5, "productId": 85}
+{"productCategory": 3, "productId": 86}
+{"productCategory": 4, "productId": 87}
+{"productCategory": 1, "productId": 88}
+{"productCategory": 5, "productId": 89}
+{"productCategory": 3, "productId": 90}
+{"productCategory": 5, "productId": 91}
+{"productCategory": 2, "productId": 92}
+{"productCategory": 2, "productId": 93}
+{"productCategory": 3, "productId": 94}
+{"productCategory": 1, "productId": 95}
+{"productCategory": 1, "productId": 96}
+{"productCategory": 5, "productId": 97}
+{"productCategory": 3, "productId": 98}
+{"productCategory": 5, "productId": 99}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/library/src/main/java/com/datatorrent/lib/util/FieldInfo.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/util/FieldInfo.java b/library/src/main/java/com/datatorrent/lib/util/FieldInfo.java
index e07bd04..d70ca0c 100644
--- a/library/src/main/java/com/datatorrent/lib/util/FieldInfo.java
+++ b/library/src/main/java/com/datatorrent/lib/util/FieldInfo.java
@@ -20,6 +20,8 @@ package com.datatorrent.lib.util;
import javax.validation.constraints.NotNull;
+import org.apache.commons.lang3.ClassUtils;
+
@SuppressWarnings("rawtypes")
/**
* @since 3.3.0
@@ -114,7 +116,7 @@ public class FieldInfo
}
public static enum SupportType {
- BOOLEAN(Boolean.class), SHORT(Short.class), INTEGER(Integer.class), LONG(Long.class), FLOAT(Float.class), DOUBLE(Double.class), STRING(String.class);
+ BOOLEAN(Boolean.class), SHORT(Short.class), INTEGER(Integer.class), LONG(Long.class), FLOAT(Float.class), DOUBLE(Double.class), STRING(String.class), OBJECT(Object.class);
private Class javaType;
@@ -127,6 +129,17 @@ public class FieldInfo
{
return javaType;
}
+
+ public static SupportType getFromJavaType(Class type)
+ {
+ for (SupportType supportType : SupportType.values()) {
+ if (supportType.getJavaType() == ClassUtils.primitiveToWrapper(type)) {
+ return supportType;
+ }
+ }
+
+ return OBJECT;
+ }
}
}