You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by sa...@apache.org on 2016/05/10 09:05:57 UTC
[3/4] incubator-apex-malhar git commit: APEXMALHAR-2023 Adding
Enrichment Operator to Malhar
APEXMALHAR-2023 Adding Enrichment Operator to Malhar
Added 2 operators POJOEnricher and MapEnricher which enriches the given POJO or map as configured.
The operators are marked evolving.
Test cases added for both operators.
2 Backend loaders are added for File and JDBC.
Hbase loader removed as there are some things that needs to be discussed. Postponed the implementation for after discussion.
Added a new SupportType for OBJECT to FieldInfo.
Added static method in FieldInfo for conversion to Class -> SupportedType.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/9600eddd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/9600eddd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/9600eddd
Branch: refs/heads/master
Commit: 9600eddd8ee4fa010cbb4799da74e89d184daa40
Parents: 528e7ac
Author: chinmaykolhatkar <ch...@datatorrent.com>
Authored: Tue Mar 29 14:46:54 2016 +0530
Committer: chinmaykolhatkar <ch...@datatorrent.com>
Committed: Fri Apr 29 22:59:48 2016 +0530
----------------------------------------------------------------------
.../contrib/enrich/AbstractEnricher.java | 321 +++++++++++++++++++
.../contrib/enrich/BackendLoader.java | 42 +++
.../datatorrent/contrib/enrich/FSLoader.java | 184 +++++++++++
.../datatorrent/contrib/enrich/JDBCLoader.java | 201 ++++++++++++
.../datatorrent/contrib/enrich/MapEnricher.java | 138 ++++++++
.../contrib/enrich/NullValuesCacheManager.java | 60 ++++
.../contrib/enrich/POJOEnricher.java | 287 +++++++++++++++++
.../contrib/enrich/ReadOnlyBackup.java | 61 ++++
.../enrichment/AbstractEnrichmentOperator.java | 170 ----------
.../contrib/enrichment/EnrichmentBackup.java | 18 --
.../contrib/enrichment/FSLoader.java | 146 ---------
.../contrib/enrichment/HBaseLoader.java | 128 --------
.../contrib/enrichment/JDBCLoader.java | 158 ---------
.../enrichment/MapEnrichmentOperator.java | 57 ----
.../enrichment/NullValuesCacheManager.java | 43 ---
.../enrichment/POJOEnrichmentOperator.java | 185 -----------
.../contrib/enrichment/ReadOnlyBackup.java | 38 ---
.../contrib/enrichment/package-info.java | 1 -
.../contrib/enrich/EmployeeOrder.java | 114 +++++++
.../contrib/enrich/FileEnrichmentTest.java | 103 ++++++
.../contrib/enrich/JDBCLoaderTest.java | 209 ++++++++++++
.../contrib/enrich/MapEnricherTest.java | 251 +++++++++++++++
.../com/datatorrent/contrib/enrich/Order.java | 71 ++++
.../contrib/enrich/POJOEnricherTest.java | 232 ++++++++++++++
.../enrichment/BeanEnrichmentOperatorTest.java | 95 ------
.../contrib/enrichment/EmployeeOrder.java | 95 ------
.../contrib/enrichment/FileEnrichmentTest.java | 75 -----
.../contrib/enrichment/HBaseLoaderTest.java | 162 ----------
.../contrib/enrichment/JDBCLoaderTest.java | 179 -----------
.../enrichment/MapEnrichmentOperatorTest.java | 152 ---------
contrib/src/test/resources/productmapping.txt | 100 ++++++
.../com/datatorrent/lib/util/FieldInfo.java | 15 +-
32 files changed, 2388 insertions(+), 1703 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java
new file mode 100644
index 0000000..cdefddf
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.NotNull;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.db.cache.CacheManager;
+import com.datatorrent.lib.db.cache.CacheStore;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+
+/**
+ * Base class for Enrichment Operator. Subclasses should provide implementation to getKey and convert.
+ * The operator receives a tuple and emits enriched tuple based on includeFields and lookupFields. <br/>
+ * <p>
+ * Properties:<br>
+ * <b>lookupFields</b>: List of comma separated keys for quick searching. Ex: Field1,Field2,Field3<br>
+ * <b>includeFields</b>: List of comma separated fields to be replaced/added to the input tuple. Ex: Field1,Field2,Field3<br>
+ * <b>store</b>: Specify the type of loader for looking data<br>
+ * <br>
+ *
+ * @param <INPUT> Type of tuples which are received by this operator</T>
+ * @param <OUTPUT> Type of tuples which are emitted by this operator</T>
+ * @displayName Abstract Enrichment Operator
+ * @tags Enrichment
+ */
+@InterfaceStability.Evolving
+public abstract class AbstractEnricher<INPUT, OUTPUT> extends BaseOperator implements Operator.ActivationListener
+{
+ /**
+ * Mandatory parameters for Enricher
+ */
+ @NotNull
+ protected List<String> lookupFields;
+ @NotNull
+ protected List<String> includeFields;
+ @NotNull
+ private BackendLoader store;
+
+ /**
+ * Optional parameters for enricher.
+ */
+ private int cacheExpirationInterval = 1 * 60 * 60 * 1000; // 1 hour
+ private int cacheCleanupInterval = 1 * 60 * 60 * 1000; // 1 hour
+ private int cacheSize = 1024; // 1024 records
+
+ /**
+ * Helper variables.
+ */
+ private transient CacheManager cacheManager;
+ protected transient List<FieldInfo> lookupFieldInfo = new ArrayList<>();
+ protected transient List<FieldInfo> includeFieldInfo = new ArrayList<>();
+
+ /**
+ * This method needs to be called by implementing class for processing a tuple for enrichment.
+ * The method will take the tuple through following stages:
+ * <ol>
+ * <li>Call {@link #getKey(Object)} to retrieve key fields for lookup</li>
+ * <li>Using key fields call cache manager to retrieve for any key that is cached already</li>
+ * <li>If not found in cache, it'll do a lookup in configured backend store</li>
+ * <li>The retrieved fields will be passed to {@link #convert(Object, Object)} method to create the final object</li>
+ * <li>Finally {@link #emitEnrichedTuple(Object)} is called for emitting the tuple</li>
+ * </ol>
+ *
+ * @param tuple Input tuple that needs to get processed for enrichment.
+ */
+ protected void enrichTuple(INPUT tuple)
+ {
+ Object key = getKey(tuple);
+ if (key != null) {
+ Object result = cacheManager.get(key);
+ OUTPUT out = convert(tuple, result);
+ if (out != null) {
+ emitEnrichedTuple(out);
+ }
+ }
+ }
+
+ /**
+ * The method should be implemented by concrete class which returns an ArrayList<Object> containing all the fields
+ * which forms key part of lookup.
+ * The order of field values should be same as the one set in {@link #lookupFields} variable.
+ *
+ * @param tuple Input tuple from which fields values for key needs to be fetched.
+ * @return Should return ArrayList<Object> which has fields values forming keys in same order as {@link #lookupFields}
+ */
+ protected abstract Object getKey(INPUT tuple);
+
+ /**
+ * The method should be implemented by concrete class.
+ * This method is expected to take input tuple and an externally fetched object containing fields to be enriched, and
+ * return an enriched tuple which is ready to be emitted.
+ *
+ * @param in Input tuple which needs to be enriched.
+ * @param cached ArrayList<Object> containing missing data retrieved from external sources.
+ * @return Enriched tuple of type OUTPUT
+ */
+ protected abstract OUTPUT convert(INPUT in, Object cached);
+
+ /**
+ * This method should be implemented by concrete class.
+ * The method is expected to emit tuple of type OUTPUT
+ *
+ * @param tuple Tuple of type OUTPUT that should be emitted.
+ */
+ protected abstract void emitEnrichedTuple(OUTPUT tuple);
+
+ /**
+ * This method should be implemented by concrete method.
+ * The method should return Class type of field for given fieldName from output tuple.
+ *
+ * @param fieldName Field name for which field type needs to be identified
+ * @return Class type for given field.
+ */
+ protected abstract Class<?> getIncludeFieldType(String fieldName);
+
+ /**
+ * This method should be implemented by concrete method.
+ * The method should return Class type of field for given fieldName from input tuple.
+ *
+ * @param fieldName Field name for which field type needs to be identified
+ * @return Class type for given field.
+ */
+ protected abstract Class<?> getLookupFieldType(String fieldName);
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+
+ cacheManager = new NullValuesCacheManager();
+ CacheStore primaryCache = new CacheStore();
+
+ // set expiration to one day.
+ primaryCache.setEntryExpiryDurationInMillis(cacheExpirationInterval);
+ primaryCache.setCacheCleanupInMillis(cacheCleanupInterval);
+ primaryCache.setEntryExpiryStrategy(CacheStore.ExpiryType.EXPIRE_AFTER_WRITE);
+ primaryCache.setMaxCacheSize(cacheSize);
+
+ cacheManager.setPrimary(primaryCache);
+ cacheManager.setBackup(store);
+ }
+
+ @Override
+ public void activate(Context context)
+ {
+ for (String s : lookupFields) {
+ lookupFieldInfo.add(new FieldInfo(s, s, SupportType.getFromJavaType(getLookupFieldType(s))));
+ }
+
+ if (includeFields != null) {
+ for (String s : includeFields) {
+ includeFieldInfo.add(new FieldInfo(s, s, SupportType.getFromJavaType(getIncludeFieldType(s))));
+ }
+ }
+
+ store.setFieldInfo(lookupFieldInfo, includeFieldInfo);
+
+ try {
+ cacheManager.initialize();
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to initialize primary cache", e);
+ }
+ }
+
+ @Override
+ public void deactivate()
+ {
+ }
+
+ /**
+ * Returns a list of fields which are used for lookup.
+ *
+ * @return List of fields
+ */
+ public List<String> getLookupFields()
+ {
+ return lookupFields;
+ }
+
+ /**
+ * Set fields on which lookup needs to happen in external store.
+ * This is a mandatory parameter.
+ *
+ * @param lookupFields List of fields on which lookup happens.
+ */
+ public void setLookupFields(List<String> lookupFields)
+ {
+ this.lookupFields = lookupFields;
+ }
+
+ /**
+ * Returns a list of fields using which tuple is enriched
+ *
+ * @return List of fields.
+ */
+ public List<String> getIncludeFields()
+ {
+ return includeFields;
+ }
+
+ /**
+ * Sets list of fields to be fetched from external store for enriching the tuple.
+ * This is a mandatory parameter.
+ *
+ * @param includeFields List of fields.
+ */
+ public void setIncludeFields(List<String> includeFields)
+ {
+ this.includeFields = includeFields;
+ }
+
+ /**
+ * Returns the backend store which will enrich the tuple.
+ *
+ * @return Object of type {@link BackendLoader}
+ */
+ public BackendLoader getStore()
+ {
+ return store;
+ }
+
+ /**
+ * Sets backend store which will enrich the tuple.
+ * This is a mandatory parameter.
+ *
+ * @param store Object of type {@link BackendLoader}
+ */
+ public void setStore(BackendLoader store)
+ {
+ this.store = store;
+ }
+
+ /**
+ * Returns cache entry expiration interval in ms.
+ * This is an optional parameter.
+ *
+ * @return Cache entry expiration interval in ms
+ */
+ public int getCacheExpirationInterval()
+ {
+ return cacheExpirationInterval;
+ }
+
+ /**
+ * Sets cache entry expiration interval in ms.
+ * This is an optional parameter.
+ *
+ * @param cacheExpirationInterval Cache entry expiration interval in ms
+ */
+ public void setCacheExpirationInterval(int cacheExpirationInterval)
+ {
+ this.cacheExpirationInterval = cacheExpirationInterval;
+ }
+
+ /**
+ * Returns cache cleanup interval in ms. After this interval, cache cleanup operation will be performed.
+ * This is an optional parameter.
+ *
+ * @return cache cleanup interval in ms.
+ */
+ public int getCacheCleanupInterval()
+ {
+ return cacheCleanupInterval;
+ }
+
+ /**
+ * Set Cache cleanup interval in ms. After this interval, cache cleanup operation will be performed.
+ * This is an optional parameter.
+ *
+ * @param cacheCleanupInterval cache cleanup interval in ms.
+ */
+ public void setCacheCleanupInterval(int cacheCleanupInterval)
+ {
+ this.cacheCleanupInterval = cacheCleanupInterval;
+ }
+
+ /**
+ * Get size (number of entries) of cache.
+ *
+ * @return Number of entries allowed in cache.
+ */
+ public int getCacheSize()
+ {
+ return cacheSize;
+ }
+
+ /**
+ * Set size (number of entries) of cache.
+ *
+ * @param cacheSize Number of entries allowed in cache.
+ */
+ public void setCacheSize(int cacheSize)
+ {
+ this.cacheSize = cacheSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrich/BackendLoader.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/BackendLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/BackendLoader.java
new file mode 100644
index 0000000..9570329
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/BackendLoader.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.lib.db.cache.CacheManager;
+import com.datatorrent.lib.util.FieldInfo;
+
+/**
+ * Interface for store to be used in enrichment
+ */
+@InterfaceStability.Evolving
+public interface BackendLoader extends CacheManager.Backup
+{
+ /**
+ * Set {@link FieldInfo} for lookup fields and also include fields.
+ * Calling this method is mandatory for correct functioning of backend loader.
+ *
+ * @param lookupFieldInfo List of {@link FieldInfo} that will be used as key in lookup.
+ * @param includeFieldInfo List of {@link FieldInfo} that will be retrieved from store.
+ */
+ void setFieldInfo(List<FieldInfo> lookupFieldInfo, List<FieldInfo> includeFieldInfo);
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java
new file mode 100644
index 0000000..71d3dce
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.esotericsoftware.kryo.NotNull;
+import com.google.common.collect.Maps;
+import com.datatorrent.lib.db.cache.CacheManager;
+import com.datatorrent.lib.util.FieldInfo;
+
+
+/**
+ * This implementation of {@link BackendLoader} loads the data from a given file into memory cache and serves queries
+ * from the cache.
+ * When this is set as primaryCache in {@link CacheManager}, CacheManager can call {@link #loadInitialData()}
+ * periodically to reload the file.
+ * <p>
+ * The format of the input file is:
+ * <p>
+ * {"productCategory": 5, "productId": 0}
+ * {"productCategory": 4, "productId": 1}
+ * {"productCategory": 5, "productId": 2}
+ * {"productCategory": 5, "productId": 3}
+ * </p>
+ * Each line in the input file should be a valid json object which represents a record and each key/value pair in that
+ * json object represents the fields/value.
+ * <p>
+ * NOTE: This loader should be used with caution as all the data present in the file is loaded in memory because of
+ * which the memory consumption may go up.
+ */
+@InterfaceStability.Evolving
+public class FSLoader extends ReadOnlyBackup
+{
+ @NotNull
+ private String fileName;
+
+ private transient Path filePath;
+ private transient FileSystem fs;
+ private transient boolean connected;
+
+ private static final ObjectMapper mapper = new ObjectMapper();
+ private static final ObjectReader reader = mapper.reader(new TypeReference<Map<String, Object>>(){});
+ private static final Logger logger = LoggerFactory.getLogger(FSLoader.class);
+
+ public String getFileName()
+ {
+ return fileName;
+ }
+
+ public void setFileName(String fileName)
+ {
+ this.fileName = fileName;
+ }
+
+ @Override
+ public Map<Object, Object> loadInitialData()
+ {
+ Map<Object, Object> result = null;
+ FSDataInputStream in = null;
+ BufferedReader bin = null;
+ try {
+ result = Maps.newHashMap();
+ in = fs.open(filePath);
+ bin = new BufferedReader(new InputStreamReader(in));
+ String line;
+ while ((line = bin.readLine()) != null) {
+ try {
+ Map<String, Object> tuple = reader.readValue(line);
+ result.put(getKey(tuple), getValue(tuple));
+ } catch (JsonProcessingException parseExp) {
+ logger.info("Unable to parse line {}", line);
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (bin != null) {
+ IOUtils.closeQuietly(bin);
+ }
+ if (in != null) {
+ IOUtils.closeQuietly(in);
+ }
+ try {
+ fs.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ logger.debug("loading initial data {}", result.size());
+ return result;
+ }
+
+ private Object getValue(Map<String, Object> tuple)
+ {
+ ArrayList<Object> includeTuple = new ArrayList<Object>();
+ for (FieldInfo s : includeFieldInfo) {
+ includeTuple.add(tuple.get(s.getColumnName()));
+ }
+ return includeTuple;
+ }
+
+ private Object getKey(Map<String, Object> tuple)
+ {
+ ArrayList<Object> list = new ArrayList<Object>();
+ for (FieldInfo key : lookupFieldInfo) {
+ list.add(tuple.get(key.getColumnName()));
+ }
+ return list;
+ }
+
+ @Override
+ public Object get(Object key)
+ {
+ return null;
+ }
+
+ @Override
+ public List<Object> getAll(List<Object> keys)
+ {
+ return null;
+ }
+
+ @Override
+ public void connect() throws IOException
+ {
+ Configuration conf = new Configuration();
+ this.filePath = new Path(fileName);
+ this.fs = FileSystem.newInstance(filePath.toUri(), conf);
+ if (!fs.isFile(filePath)) {
+ throw new IOException("Provided path " + fileName + " is not a file");
+ }
+ connected = true;
+ }
+
+ @Override
+ public void disconnect() throws IOException
+ {
+ if (fs != null) {
+ fs.close();
+ }
+ }
+
+ @Override
+ public boolean isConnected()
+ {
+ return connected;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrich/JDBCLoader.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/JDBCLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/JDBCLoader.java
new file mode 100644
index 0000000..d20e87b
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/JDBCLoader.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.Lists;
+import com.datatorrent.lib.db.jdbc.JdbcStore;
+import com.datatorrent.lib.util.FieldInfo;
+
+/**
+ * <p>HBaseLoader extends from {@link JdbcStore} uses JDBC to connect and implements BackendLoaders interface.</p> <br/>
+ * <p>
+ * Properties:<br>
+ * <b>queryStmt</b>: Sql Prepared Statement which needs to be executed<br>
+ * <b>tableName</b>: JDBC table name<br>
+ * <br>
+ */
+@InterfaceStability.Evolving
+public class JDBCLoader extends JdbcStore implements BackendLoader
+{
+ protected String queryStmt;
+
+ protected String tableName;
+
+ protected transient List<FieldInfo> includeFieldInfo;
+ protected transient List<FieldInfo> lookupFieldInfo;
+
+ protected Object getQueryResult(Object key)
+ {
+ try {
+ PreparedStatement getStatement = getConnection().prepareStatement(queryStmt);
+ ArrayList<Object> keys = (ArrayList<Object>)key;
+ for (int i = 0; i < keys.size(); i++) {
+ getStatement.setObject(i + 1, keys.get(i));
+ }
+ return getStatement.executeQuery();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected ArrayList<Object> getDataFrmResult(Object result) throws RuntimeException
+ {
+ try {
+ ResultSet resultSet = (ResultSet)result;
+ if (resultSet.next()) {
+ ResultSetMetaData rsdata = resultSet.getMetaData();
+ // If the includefields is empty, populate it from ResultSetMetaData
+ if (CollectionUtils.isEmpty(includeFieldInfo)) {
+ if (includeFieldInfo == null) {
+ includeFieldInfo = new ArrayList<>();
+ }
+ for (int i = 1; i <= rsdata.getColumnCount(); i++) {
+ String columnName = rsdata.getColumnName(i);
+ // TODO: Take care of type conversion.
+ includeFieldInfo.add(new FieldInfo(columnName, columnName, FieldInfo.SupportType.OBJECT));
+ }
+ }
+
+ ArrayList<Object> res = new ArrayList<Object>();
+ for (FieldInfo f : includeFieldInfo) {
+ res.add(getConvertedData(resultSet.getObject(f.getColumnName()), f));
+ }
+ return res;
+ } else {
+ return null;
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Object getConvertedData(Object object, FieldInfo f)
+ {
+ if (f.getType().getJavaType() == object.getClass()) {
+ return object;
+ } else {
+ logger.warn("Type mismatch seen for field {}, returning as it is", f.getColumnName());
+ return null;
+ }
+ }
+
+ private String generateQueryStmt()
+ {
+ String stmt = "select * from " + tableName + " where ";
+ boolean first = true;
+ for (FieldInfo fieldInfo : lookupFieldInfo) {
+ if (first) {
+ first = false;
+ } else {
+ stmt += " and ";
+ }
+ stmt += fieldInfo.getColumnName() + " = ?";
+ }
+
+ logger.info("generateQueryStmt: {}", stmt);
+ return stmt;
+ }
+
+ public String getQueryStmt()
+ {
+ return queryStmt;
+ }
+
+ /**
+ * Set the sql Prepared Statement if the enrichment mechanism is query based.
+ */
+ public void setQueryStmt(String queryStmt)
+ {
+ this.queryStmt = queryStmt;
+ }
+
+ public String getTableName()
+ {
+ return tableName;
+ }
+
+ /**
+ * Set the table name.
+ */
+ public void setTableName(String tableName)
+ {
+ this.tableName = tableName;
+ }
+
+ @Override
+ public void setFieldInfo(List<FieldInfo> lookupFieldInfo, List<FieldInfo> includeFieldInfo)
+ {
+ this.lookupFieldInfo = lookupFieldInfo;
+ this.includeFieldInfo = includeFieldInfo;
+ if (queryStmt == null) {
+ queryStmt = generateQueryStmt();
+ }
+ }
+
+ @Override
+ public Map<Object, Object> loadInitialData()
+ {
+ return null;
+ }
+
+ @Override
+ public Object get(Object key)
+ {
+ return getDataFrmResult(getQueryResult(key));
+ }
+
+ @Override
+ public List<Object> getAll(List<Object> keys)
+ {
+ List<Object> values = Lists.newArrayList();
+ for (Object key : keys) {
+ values.add(get(key));
+ }
+ return values;
+ }
+
+ @Override
+ public void put(Object key, Object value)
+ {
+ throw new UnsupportedOperationException("Not supported operation");
+ }
+
+ @Override
+ public void putAll(Map<Object, Object> m)
+ {
+ throw new UnsupportedOperationException("Not supported operation");
+ }
+
+ @Override
+ public void remove(Object key)
+ {
+ throw new UnsupportedOperationException("Not supported operation");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrich/MapEnricher.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/MapEnricher.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/MapEnricher.java
new file mode 100644
index 0000000..ecf16ba
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/MapEnricher.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.util.FieldInfo;
+
+/**
+ * This class takes a HashMap tuple as input and extracts value of the lookupKey configured
+ * for this operator. It perform a lookup using {@link com.datatorrent.lib.db.cache.CacheManager} to
+ * find a matching entry and adds the result to the original tuple.
+ *
+ * <p>
+ * Example:
+ * Lets say, input tuple is
+ * { amount=10.0, channelId=4, productId=3 }
+ * The tuple is modified as below:
+ * { amount=10.0, channelId=4, productId=3, <b>productCategory=1 </b>}
+ * </p>
+ *
+ * @displayName MapEnricher
+ * @category Database
+ * @tags enrichment, lookup, map
+ */
+@InterfaceStability.Evolving
+public class MapEnricher extends AbstractEnricher<Map<String, Object>, Map<String, Object>>
+{
+ public final transient DefaultInputPort<Map<String, Object>> input = new DefaultInputPort<Map<String, Object>>()
+ {
+ @Override
+ public void process(Map<String, Object> obj)
+ {
+ processTuple(obj);
+ }
+ };
+
+ public final transient DefaultOutputPort<Map<String, Object>> output = new DefaultOutputPort<>();
+
+ protected void processTuple(Map<String, Object> obj)
+ {
+ enrichTuple(obj);
+ }
+
+ @Override
+ protected Object getKey(Map<String, Object> tuple)
+ {
+ ArrayList<Object> keyList = new ArrayList<Object>();
+
+ for (FieldInfo fieldInfo : lookupFieldInfo) {
+ keyList.add(tuple.get(fieldInfo.getColumnName()));
+ }
+
+ return keyList;
+ }
+
+ @Override
+ protected Map<String, Object> convert(Map<String, Object> in, Object cached)
+ {
+ if (cached == null) {
+ return in;
+ }
+
+ ArrayList<Object> newAttributes = (ArrayList<Object>)cached;
+ if (newAttributes != null) {
+ for (int i = 0; i < includeFieldInfo.size(); i++) {
+ in.put(includeFieldInfo.get(i).getColumnName(), newAttributes.get(i));
+ }
+ }
+ return in;
+ }
+
+ @Override
+ protected void emitEnrichedTuple(Map<String, Object> tuple)
+ {
+ output.emit(tuple);
+ }
+
+ @Override
+ protected Class<?> getIncludeFieldType(String fieldName)
+ {
+ return Object.class;
+ }
+
+ @Override
+ protected Class<?> getLookupFieldType(String fieldName)
+ {
+ return Object.class;
+ }
+
+ /**
+ * Set fields on which lookup against which lookup will be performed.
+ * This is a mandatory parameter to set.
+ *
+ * @param lookupFields List of fields on which lookup happens.
+ * @description $[] Field which become part of lookup key
+ */
+ @Override
+ public void setLookupFields(List<String> lookupFields)
+ {
+ super.setLookupFields(lookupFields);
+ }
+
+ /**
+ * Set fields which will enrich the map.
+ * This is a mandatory parameter to set.
+ *
+ * @param includeFields List of fields.
+ * @description $[] Field which are fetched from store
+ */
+ @Override
+ public void setIncludeFields(List<String> includeFields)
+ {
+ super.setIncludeFields(includeFields);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrich/NullValuesCacheManager.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/NullValuesCacheManager.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/NullValuesCacheManager.java
new file mode 100644
index 0000000..2cf7326
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/NullValuesCacheManager.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.lib.db.cache.CacheManager;
+
+/**
+ * Null Values Cache Manager. Using this NULL entries can be specified explicitly.
+ */
+@InterfaceStability.Evolving
+public class NullValuesCacheManager extends CacheManager
+{
+
+ private static final NullObject NULL = new NullObject();
+
+ @Override
+ public Object get(Object key)
+ {
+ Object primaryVal = primary.get(key);
+ if (primaryVal != null) {
+ if (primaryVal == NULL) {
+ return null;
+ }
+
+ return primaryVal;
+ }
+
+ Object backupVal = backup.get(key);
+ if (backupVal != null) {
+ primary.put(key, backupVal);
+ } else {
+ primary.put(key, NULL);
+ }
+
+ return backupVal;
+ }
+
+ private static class NullObject
+ {
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrich/POJOEnricher.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/POJOEnricher.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/POJOEnricher.java
new file mode 100644
index 0000000..782fbc5
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/POJOEnricher.java
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.PojoUtils;
+
+
+/**
+ * This class takes a POJO as input and extracts the value of the lookupKey configured
+ * for this operator. It perform a lookup using {@link com.datatorrent.lib.db.cache.CacheManager} to
+ * find a matching entry and adds the result to the original tuple.
+ *
+ * <p>
+ * Properties:<br>
+ * <b>inputClass</b>: Class to be loaded for the incoming data type<br>
+ * <b>outputClass</b>: Class to be loaded for the emitted data type<br>
+ * <br>
+ * <p>
+ *
+ * <p>
+ * Example:
+ * Lets say, input tuple is
+ * { amount=10.0, channelId=4, productId=3 }
+ * The tuple is modified as below:
+ * { amount=10.0, channelId=4, productId=3, <b>productCategory=1 </b>}
+ * </p>
+ *
+ * @displayName POJOEnricher
+ * @category Database
+ * @tags enrichment, enricher, pojo, schema, lookup
+ */
+@InterfaceStability.Evolving
+public class POJOEnricher extends AbstractEnricher<Object, Object>
+{
+ private static final Logger logger = LoggerFactory.getLogger(POJOEnricher.class);
+
+ /**
+ * Helper fields
+ */
+ protected Class<?> inputClass;
+ protected Class<?> outputClass;
+ private transient Map<PojoUtils.Getter, PojoUtils.Setter> fieldMap = new HashMap<>();
+ private transient List<PojoUtils.Setter> includeSetters = new ArrayList<>();
+ private transient List<PojoUtils.Getter> lookupGetters = new ArrayList<>();
+
+ /**
+ * AutoMetrics
+ */
+ @AutoMetric
+ private int enrichedTupleCount;
+ @AutoMetric
+ private int errorTupleCount;
+
+ @InputPortFieldAnnotation(schemaRequired = true)
+ public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
+ {
+ @Override
+ public void setup(Context.PortContext context)
+ {
+ inputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
+
+ @Override
+ public void process(Object object)
+ {
+ processTuple(object);
+ }
+ };
+
+ @OutputPortFieldAnnotation(schemaRequired = true)
+ public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>()
+ {
+ @Override
+ public void setup(Context.PortContext context)
+ {
+ outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
+ };
+
+ public final transient DefaultOutputPort<Object> error = new DefaultOutputPort<>();
+
+ protected void processTuple(Object object)
+ {
+ enrichTuple(object);
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ enrichedTupleCount = 0;
+ errorTupleCount = 0;
+ }
+
+ @Override
+ protected Object getKey(Object tuple)
+ {
+ ArrayList<Object> keyList = new ArrayList<>();
+ for (PojoUtils.Getter lookupGetter : lookupGetters) {
+ keyList.add(lookupGetter.get(tuple));
+ }
+ return keyList;
+ }
+
+ @Override
+ protected Object convert(Object in, Object cached)
+ {
+ Object o;
+
+ try {
+ o = outputClass.newInstance();
+ } catch (InstantiationException | IllegalAccessException e) {
+ logger.error("Failed to create new instance of output POJO", e);
+ errorTupleCount++;
+ error.emit(in);
+ return null;
+ }
+
+ try {
+ for (Map.Entry<PojoUtils.Getter, PojoUtils.Setter> entry : fieldMap.entrySet()) {
+ entry.getValue().set(o, entry.getKey().get(in));
+ }
+ } catch (RuntimeException e) {
+ logger.error("Failed to set the property. Continuing with default.", e);
+ errorTupleCount++;
+ error.emit(in);
+ return null;
+ }
+
+ if (cached == null) {
+ return o;
+ }
+
+ ArrayList<Object> includeObjects = (ArrayList<Object>)cached;
+ for (int i = 0; i < includeSetters.size(); i++) {
+ try {
+ includeSetters.get(i).set(o, includeObjects.get(i));
+ } catch (RuntimeException e) {
+ logger.error("Failed to set the property. Continuing with default.", e);
+ errorTupleCount++;
+ error.emit(in);
+ return null;
+ }
+ }
+
+ return o;
+ }
+
+ @Override
+ protected void emitEnrichedTuple(Object tuple)
+ {
+ output.emit(tuple);
+ enrichedTupleCount++;
+ }
+
+ @Override
+ protected Class<?> getIncludeFieldType(String fieldName)
+ {
+ try {
+ return outputClass.getDeclaredField(fieldName).getType();
+ } catch (NoSuchFieldException e) {
+ logger.warn("Failed to find given fieldName, returning object type", e);
+ return Object.class;
+ }
+ }
+
+ @Override
+ protected Class<?> getLookupFieldType(String fieldName)
+ {
+ try {
+ return inputClass.getDeclaredField(fieldName).getType();
+ } catch (NoSuchFieldException e) {
+ logger.warn("Failed to find given fieldName, returning object type", e);
+ return Object.class;
+ }
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private PojoUtils.Setter generateSettersForField(Class<?> klass, String outputFieldName)
+ throws NoSuchFieldException, SecurityException
+ {
+ Field f = klass.getDeclaredField(outputFieldName);
+ Class c = ClassUtils.primitiveToWrapper(f.getType());
+ return PojoUtils.createSetter(klass, outputFieldName, c);
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private PojoUtils.Getter generateGettersForField(Class<?> klass, String inputFieldName)
+ throws NoSuchFieldException, SecurityException
+ {
+ Field f = klass.getDeclaredField(inputFieldName);
+ Class c = ClassUtils.primitiveToWrapper(f.getType());
+ return PojoUtils.createGetter(klass, inputFieldName, c);
+ }
+
+ @Override
+ public void activate(Context context)
+ {
+ super.activate(context);
+
+ for (Field field : inputClass.getDeclaredFields()) {
+ try {
+ fieldMap.put(generateGettersForField(inputClass, field.getName()),
+ generateSettersForField(outputClass, field.getName()));
+ } catch (NoSuchFieldException e) {
+ throw new RuntimeException("Unable to find field with name " + field.getName() + ", ignoring that field.", e);
+ }
+ }
+
+ for (FieldInfo fieldInfo : this.includeFieldInfo) {
+ try {
+ includeSetters.add(generateSettersForField(outputClass, fieldInfo.getColumnName()));
+ } catch (NoSuchFieldException e) {
+ throw new RuntimeException("Given field name is not present in output POJO", e);
+ }
+ }
+
+ for (FieldInfo fieldInfo : this.lookupFieldInfo) {
+ try {
+ lookupGetters.add(generateGettersForField(inputClass, fieldInfo.getColumnName()));
+ } catch (NoSuchFieldException e) {
+ throw new RuntimeException("Given lookup field is not present in POJO", e);
+ }
+ }
+ }
+
+ /**
+ * Set fields on which lookup against which lookup will be performed.
+ * This is a mandatory parameter to set.
+ *
+ * @param lookupFields List of fields on which lookup happens.
+ * @description $[] Field which become part of lookup key
+ * @useSchema $[] input.fields[].name
+ */
+ @Override
+ public void setLookupFields(List<String> lookupFields)
+ {
+ super.setLookupFields(lookupFields);
+ }
+
+ /**
+ * Set fields which will enrich the POJO.
+ * This is a mandatory parameter to set.
+ *
+ * @param includeFields List of fields.
+ * @description $[] Field which are fetched from store
+ * @useSchema $[] output.fields[].name
+ */
+ @Override
+ public void setIncludeFields(List<String> includeFields)
+ {
+ super.setIncludeFields(includeFields);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrich/ReadOnlyBackup.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/ReadOnlyBackup.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/ReadOnlyBackup.java
new file mode 100644
index 0000000..157dbc9
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/ReadOnlyBackup.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.lib.util.FieldInfo;
+
+/**
+ * ReadOnly abstract implementation of BackendLoader.
+ */
+@InterfaceStability.Evolving
+public abstract class ReadOnlyBackup implements BackendLoader
+{
+ protected transient List<FieldInfo> includeFieldInfo;
+ protected transient List<FieldInfo> lookupFieldInfo;
+
+ @Override
+ public void put(Object key, Object value)
+ {
+ throw new UnsupportedOperationException("Not supported operation");
+ }
+
+ @Override
+ public void putAll(Map<Object, Object> m)
+ {
+ throw new UnsupportedOperationException("Not supported operation");
+ }
+
+ @Override
+ public void remove(Object key)
+ {
+ throw new UnsupportedOperationException("Not supported operation");
+ }
+
+ @Override
+ public void setFieldInfo(List<FieldInfo> lookupFieldInfo, List<FieldInfo> includeFieldInfo)
+ {
+ this.includeFieldInfo = includeFieldInfo;
+ this.lookupFieldInfo = lookupFieldInfo;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/AbstractEnrichmentOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/AbstractEnrichmentOperator.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/AbstractEnrichmentOperator.java
deleted file mode 100644
index cbd4d5e..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/AbstractEnrichmentOperator.java
+++ /dev/null
@@ -1,170 +0,0 @@
-package com.datatorrent.contrib.enrichment;
-
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-import com.datatorrent.lib.db.cache.CacheManager;
-import com.datatorrent.lib.db.cache.CacheStore;
-import com.datatorrent.lib.db.cache.CacheStore.ExpiryType;
-import com.esotericsoftware.kryo.NotNull;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Base class for Enrichment Operator. Subclasses should provide implementation to getKey and convert.
- * The operator receives a tuple and emits enriched tuple based on includeFields and lookupFields. <br/>
- *
- * Properties:<br>
- * <b>lookupFieldsStr</b>: List of comma separated keys for quick searching. Ex: Field1,Field2,Field3<br>
- * <b>includeFieldsStr</b>: List of comma separated fields to be replaced/added to the input tuple. Ex: Field1,Field2,Field3<br>
- * <b>store</b>: Specify the type of loader for looking data<br>
- * <br>
- *
- *
- * @displayName Abstract Enrichment Operator
- * @tags Enrichment
- * @param <INPUT> Type of tuples which are received by this operator</T>
- * @param <OUTPUT> Type of tuples which are emitted by this operator</T>
- * @since 2.1.0
- */
-public abstract class AbstractEnrichmentOperator<INPUT, OUTPUT> extends BaseOperator
-{
- /**
- * Keep lookup data cache for fast access.
- */
- private transient CacheManager cacheManager;
-
- private transient CacheStore primaryCache = new CacheStore();
-
- private int entryExpiryDurationInMillis = 24 * 60 * 60 * 1000;
- private int cacheCleanupInMillis = 24 * 60 * 60 * 1000;
- private int cacheSize = 1024;
-
- public transient DefaultOutputPort<OUTPUT> output = new DefaultOutputPort<OUTPUT>();
-
- @InputPortFieldAnnotation(optional = true)
- public transient DefaultInputPort<INPUT> input = new DefaultInputPort<INPUT>()
- {
- @Override public void process(INPUT tuple)
- {
- processTuple(tuple);
- }
- };
-
- private EnrichmentBackup store;
-
- @NotNull
- protected String lookupFieldsStr;
-
- protected String includeFieldsStr;
-
- protected transient List<String> lookupFields = new ArrayList<String>();
- protected transient List<String> includeFields = new ArrayList<String>();
-
- protected void processTuple(INPUT tuple) {
- Object key = getKey(tuple);
- if(key != null) {
- Object result = cacheManager.get(key);
- OUTPUT out = convert(tuple, result);
- emitTuple(out);
- }
- }
-
- protected abstract Object getKey(INPUT tuple);
-
- protected void emitTuple(OUTPUT tuple) {
- output.emit(tuple);
- }
-
- /* Add data from cached value to input field */
- protected abstract OUTPUT convert(INPUT in, Object cached);
-
- @Override public void setup(Context.OperatorContext context)
- {
- super.setup(context);
-
- cacheManager = new NullValuesCacheManager();
-
- // set expiration to one day.
- primaryCache.setEntryExpiryDurationInMillis(entryExpiryDurationInMillis);
- primaryCache.setCacheCleanupInMillis(cacheCleanupInMillis);
- primaryCache.setEntryExpiryStrategy(ExpiryType.EXPIRE_AFTER_WRITE);
- primaryCache.setMaxCacheSize(cacheSize);
-
- lookupFields = Arrays.asList(lookupFieldsStr.split(","));
- if (includeFieldsStr != null) {
- includeFields = Arrays.asList(includeFieldsStr.split(","));
- }
-
- try {
- store.setFields(lookupFields, includeFields);
-
- cacheManager.setPrimary(primaryCache);
- cacheManager.setBackup(store);
- cacheManager.initialize();
- } catch (IOException e) {
- throw new RuntimeException("Unable to initialize primary cache", e);
- }
- }
-
- /**
- * Set the type of backup store for storing and searching data.
- */
- public void setStore(EnrichmentBackup store) {
- this.store = store;
- }
-
- public EnrichmentBackup getStore() {
- return store;
- }
-
- public CacheStore getPrimaryCache()
- {
- return primaryCache;
- }
-
- public String getLookupFieldsStr()
- {
- return lookupFieldsStr;
- }
-
- /**
- * Set the lookup fields for quick searching. It would be in comma separated list
- */
- public void setLookupFieldsStr(String lookupFieldsStr)
- {
- this.lookupFieldsStr = lookupFieldsStr;
- }
-
- public String getIncludeFieldsStr()
- {
- return includeFieldsStr;
- }
-
- /**
- * Set the list of comma separated fields to be added/replaced to the incoming tuple.
- */
- public void setIncludeFieldsStr(String includeFieldsStr)
- {
- this.includeFieldsStr = includeFieldsStr;
- }
-
- public void setEntryExpiryDurationInMillis(int entryExpiryDurationInMillis)
- {
- this.entryExpiryDurationInMillis = entryExpiryDurationInMillis;
- }
-
- public void setCacheCleanupInMillis(int cacheCleanupInMillis)
- {
- this.cacheCleanupInMillis = cacheCleanupInMillis;
- }
-
- public void setCacheSize(int cacheSize)
- {
- this.cacheSize = cacheSize;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/EnrichmentBackup.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/EnrichmentBackup.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/EnrichmentBackup.java
deleted file mode 100644
index 9155b13..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/EnrichmentBackup.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/**
- * Copyright (c) 2015 DataTorrent, Inc.
- * All rights reserved.
- */
-
-package com.datatorrent.contrib.enrichment;
-
-import com.datatorrent.lib.db.cache.CacheManager;
-import java.util.List;
-/**
- * @since 3.1.0
- */
-
-public interface EnrichmentBackup extends CacheManager.Backup
-{
- public void setFields(List<String> lookupFields,List<String> includeFields);
- public boolean needRefresh();
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/FSLoader.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/FSLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/FSLoader.java
deleted file mode 100644
index 2effed0..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/FSLoader.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Copyright (c) 2015 DataTorrent, Inc.
- * All rights reserved.
- */
-package com.datatorrent.contrib.enrichment;
-
-import com.esotericsoftware.kryo.NotNull;
-import com.google.common.collect.Maps;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.codehaus.jackson.JsonProcessingException;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.ObjectReader;
-import org.codehaus.jackson.type.TypeReference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-/**
- * @since 3.1.0
- */
-
-public class FSLoader extends ReadOnlyBackup
-{
- @NotNull
- private String fileName;
-
- private transient Path filePath;
- private transient FileSystem fs;
- private transient boolean connected;
-
- private transient static final ObjectMapper mapper = new ObjectMapper();
- private transient static final ObjectReader reader = mapper.reader(new TypeReference<Map<String, Object>>()
- {
- });
- private transient static final Logger logger = LoggerFactory.getLogger(FSLoader.class);
-
- public String getFileName()
- {
- return fileName;
- }
-
- public void setFileName(String fileName)
- {
- this.fileName = fileName;
- }
-
- @Override public Map<Object, Object> loadInitialData()
- {
- Map<Object, Object> result = null;
- FSDataInputStream in = null;
- BufferedReader bin = null;
- try {
- result = Maps.newHashMap();
- in = fs.open(filePath);
- bin = new BufferedReader(new InputStreamReader(in));
- String line;
- while ((line = bin.readLine()) != null) {
- try {
- Map<String, Object> tuple = reader.readValue(line);
- if(CollectionUtils.isEmpty(includeFields)) {
- if(includeFields == null)
- includeFields = new ArrayList<String>();
- for (Map.Entry<String, Object> e : tuple.entrySet()) {
- includeFields.add(e.getKey());
- }
- }
- ArrayList<Object> includeTuple = new ArrayList<Object>();
- for(String s: includeFields) {
- includeTuple.add(tuple.get(s));
- }
- result.put(getKey(tuple), includeTuple);
- } catch (JsonProcessingException parseExp) {
- logger.info("Unable to parse line {}", line);
- }
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- } finally {
- if(bin != null)
- IOUtils.closeQuietly(bin);
- if(in != null)
- IOUtils.closeQuietly(in);
- try {
- fs.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- logger.debug("loading initial data {}", result.size());
- return result;
- }
-
- private Object getKey(Map<String, Object> tuple)
- {
- ArrayList<Object> lst = new ArrayList<Object>();
- for(String key : lookupFields) {
- lst.add(tuple.get(key));
- }
- return lst;
- }
-
- @Override public Object get(Object key)
- {
- return null;
- }
-
- @Override public List<Object> getAll(List<Object> keys)
- {
- return null;
- }
-
- @Override public void connect() throws IOException
- {
- Configuration conf = new Configuration();
- this.filePath = new Path(fileName);
- this.fs = FileSystem.newInstance(filePath.toUri(), conf);
- if (!fs.isFile(filePath))
- throw new IOException("Provided path " + fileName + " is not a file");
- connected = true;
- }
-
- @Override public void disconnect() throws IOException
- {
- if (fs != null)
- fs.close();
- }
-
- @Override public boolean isConnected()
- {
- return connected;
- }
-
- @Override
- public boolean needRefresh() {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/HBaseLoader.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/HBaseLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/HBaseLoader.java
deleted file mode 100755
index 7040a7a..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/HBaseLoader.java
+++ /dev/null
@@ -1,128 +0,0 @@
-package com.datatorrent.contrib.enrichment;
-
-import com.datatorrent.contrib.hbase.HBaseStore;
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * <p>HBaseLoader extends from {@link HBaseStore} uses HBase to connect and implements EnrichmentBackup interface.</p> <br/>
- *
- * Properties:<br>
- * <b>includeFamilys</b>: List of comma separated families and each family corresponds to the group name of column fields in includeFieldsStr. Ex: Family1,Family2<br>
- * <br>
- *
- * @displayName HBaseLoader
- * @tags Loader
- * @since 2.1.0
- */
-public class HBaseLoader extends HBaseStore implements EnrichmentBackup
-{
- protected transient List<String> includeFields;
- protected transient List<String> lookupFields;
- protected transient List<String> includeFamilys;
-
- protected Object getQueryResult(Object key)
- {
- try {
- Get get = new Get(getRowBytes(((ArrayList)key).get(0)));
- int idx = 0;
- for(String f : includeFields) {
- get.addColumn(Bytes.toBytes(includeFamilys.get(idx++)), Bytes.toBytes(f));
- }
- return getTable().get(get);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- protected ArrayList<Object> getDataFrmResult(Object result)
- {
- Result res = (Result)result;
- if (res == null || res.isEmpty())
- return null;
- ArrayList<Object> columnInfo = new ArrayList<Object>();
-
- if(CollectionUtils.isEmpty(includeFields)) {
- if(includeFields == null) {
- includeFields = new ArrayList<String>();
- includeFamilys.clear();
- includeFamilys = new ArrayList<String>();
- }
- for (KeyValue kv: res.raw()) {
- includeFields.add(new String(kv.getQualifier()));
- includeFamilys.add(new String(kv.getFamily()));
- }
- }
- for(KeyValue kv : res.raw()) {
- columnInfo.add(kv.getValue());
- }
- return columnInfo;
- }
-
- private byte[] getRowBytes(Object key)
- {
- return ((String)key).getBytes();
- }
-
- @Override public void setFields(List<String> lookupFields,List<String> includeFields)
- {
- this.includeFields = includeFields;
- this.lookupFields = lookupFields;
- }
-
- /**
- * Set the familyStr and would be in the form of comma separated list.
- */
- public void setIncludeFamilyStr(String familyStr)
- {
- this.includeFamilys = Arrays.asList(familyStr.split(","));
- }
-
- @Override public boolean needRefresh()
- {
- return false;
- }
-
- @Override public Map<Object, Object> loadInitialData()
- {
- return null;
- }
-
- @Override public Object get(Object key)
- {
- return getDataFrmResult(getQueryResult(key));
- }
-
- @Override public List<Object> getAll(List<Object> keys)
- {
- List<Object> values = Lists.newArrayList();
- for (Object key : keys) {
- values.add(get(key));
- }
- return values;
- }
-
- @Override public void put(Object key, Object value)
- {
- throw new RuntimeException("Not supported operation");
- }
-
- @Override public void putAll(Map<Object, Object> m)
- {
- throw new RuntimeException("Not supported operation");
- }
-
- @Override public void remove(Object key)
- {
- throw new RuntimeException("Not supported operation");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/JDBCLoader.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/JDBCLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/JDBCLoader.java
deleted file mode 100644
index 3b1a8cf..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/JDBCLoader.java
+++ /dev/null
@@ -1,158 +0,0 @@
-package com.datatorrent.contrib.enrichment;
-
-import com.datatorrent.lib.db.jdbc.JdbcStore;
-import com.google.common.collect.Lists;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.collections.CollectionUtils;
-
-/**
- * <p>HBaseLoader extends from {@link JdbcStore} uses JDBC to connect and implements EnrichmentBackup interface.</p> <br/>
- *
- * Properties:<br>
- * <b>queryStmt</b>: Sql Prepared Statement which needs to be executed<br>
- * <b>tableName</b>: JDBC table name<br>
- * <br>
- *
- * @displayName JDBCLoader
- * @tags Loader
- * @since 2.1.0
- */
-public class JDBCLoader extends JdbcStore implements EnrichmentBackup
-{
- protected String queryStmt;
-
- protected String tableName;
-
- protected transient List<String> includeFields;
- protected transient List<String> lookupFields;
-
- protected Object getQueryResult(Object key)
- {
- try {
- PreparedStatement getStatement = getConnection().prepareStatement(queryStmt);
- ArrayList<Object> keys = (ArrayList<Object>) key;
- for (int i = 0; i < keys.size(); i++) {
- getStatement.setObject(i+1, keys.get(i));
- }
- return getStatement.executeQuery();
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
- }
-
- protected ArrayList<Object> getDataFrmResult(Object result) throws RuntimeException
- {
- try {
- ResultSet resultSet = (ResultSet) result;
- if (resultSet.next()) {
- ResultSetMetaData rsdata = resultSet.getMetaData();
- // If the includefields is empty, populate it from ResultSetMetaData
- if(CollectionUtils.isEmpty(includeFields)) {
- if(includeFields == null)
- includeFields = new ArrayList<String>();
- for (int i = 1; i <= rsdata.getColumnCount(); i++) {
- includeFields.add(rsdata.getColumnName(i));
- }
- }
- ArrayList<Object> res = new ArrayList<Object>();
- for(String f : includeFields) {
- res.add(resultSet.getObject(f));
- }
- return res;
- } else
- return null;
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
- }
-
- private String generateQueryStmt()
- {
- String stmt = "select * from " + tableName + " where ";
- for (int i = 0; i < lookupFields.size(); i++) {
- stmt = stmt + lookupFields.get(i) + " = ? ";
- if(i != lookupFields.size() - 1) {
- stmt = stmt + " and ";
- }
- }
- logger.info("generateQueryStmt: {}", stmt);
- return stmt;
- }
-
- public String getQueryStmt()
- {
- return queryStmt;
- }
-
- @Override
- public boolean needRefresh() {
- return false;
- }
-
- /**
- * Set the sql Prepared Statement if the enrichment mechanism is query based.
- */
- public void setQueryStmt(String queryStmt)
- {
- this.queryStmt = queryStmt;
- }
-
- public String getTableName()
- {
- return tableName;
- }
- /**
- * Set the table name.
- */
- public void setTableName(String tableName)
- {
- this.tableName = tableName;
- }
-
- @Override public void setFields(List<String> lookupFields,List<String> includeFields)
- {
- this.includeFields = includeFields;
- this.lookupFields = lookupFields;
- if(queryStmt == null)
- queryStmt = generateQueryStmt();
- }
- @Override public Map<Object, Object> loadInitialData()
- {
- return null;
- }
-
- @Override public Object get(Object key)
- {
- return getDataFrmResult(getQueryResult(key));
- }
-
- @Override public List<Object> getAll(List<Object> keys)
- {
- List<Object> values = Lists.newArrayList();
- for (Object key : keys) {
- values.add(get(key));
- }
- return values;
- }
-
- @Override public void put(Object key, Object value)
- {
- throw new RuntimeException("Not supported operation");
- }
-
- @Override public void putAll(Map<Object, Object> m)
- {
- throw new RuntimeException("Not supported operation");
- }
-
- @Override public void remove(Object key)
- {
- throw new RuntimeException("Not supported operation");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperator.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperator.java
deleted file mode 100644
index 040b5ae..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package com.datatorrent.contrib.enrichment;
-
-import java.util.ArrayList;
-import java.util.Map;
-
-/**
- *
- * This class takes a HashMap tuple as input and extract the value of the lookupKey configured
- * for this operator. It then does a lookup in file/DB to find matching entry and all key-value pairs
- * specified in the file/DB or based on include fields are added to original tuple.
- *
- * Example
- * The file contains data in json format, one entry per line. during setup entire file is read and
- * kept in memory for quick lookup.
- * If file contains following lines, and operator is configured with lookup key "productId"
- * { "productId": 1, "productCategory": 3 }
- * { "productId": 4, "productCategory": 10 }
- * { "productId": 3, "productCategory": 1 }
- *
- * And input tuple is
- * { amount=10.0, channelId=4, productId=3 }
- *
- * The tuple is modified as below before operator emits it on output port.
- * { amount=10.0, channelId=4, productId=3, productCategory=1 }
- *
- *
- * @displayName MapEnrichment
- * @category Database
- * @tags enrichment, lookup
- *
- * @since 2.1.0
- */
-public class MapEnrichmentOperator extends AbstractEnrichmentOperator<Map<String, Object>, Map<String, Object>>
-{
- @Override protected Object getKey(Map<String, Object> tuple)
- {
- ArrayList<Object> keyList = new ArrayList<Object>();
- for(String key : lookupFields) {
- keyList.add(tuple.get(key));
- }
- return keyList;
- }
-
- @Override protected Map<String, Object> convert(Map<String, Object> in, Object cached)
- {
- if (cached == null)
- return in;
-
- ArrayList<Object> newAttributes = (ArrayList<Object>)cached;
- if(newAttributes != null) {
- for (int i = 0; i < includeFields.size(); i++) {
- in.put(includeFields.get(i), newAttributes.get(i));
- }
- }
- return in;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/NullValuesCacheManager.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/NullValuesCacheManager.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/NullValuesCacheManager.java
deleted file mode 100644
index f668683..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/NullValuesCacheManager.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Copyright (c) 2015 DataTorrent, Inc.
- * All rights reserved.
- */
-
-package com.datatorrent.contrib.enrichment;
-
-import com.datatorrent.lib.db.cache.CacheManager;
-
-/**
- * @since 3.1.0
- */
-public class NullValuesCacheManager extends CacheManager
-{
-
- private static final NullObject NULL = new NullObject();
- @Override
- public Object get(Object key)
- {
- Object primaryVal = primary.get(key);
- if (primaryVal != null) {
- if (primaryVal == NULL) {
- return null;
- }
-
- return primaryVal;
- }
-
- Object backupVal = backup.get(key);
- if (backupVal != null) {
- primary.put(key, backupVal);
- } else {
- primary.put(key, NULL);
- }
- return backupVal;
-
- }
-
- private static class NullObject
- {
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/POJOEnrichmentOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/POJOEnrichmentOperator.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/POJOEnrichmentOperator.java
deleted file mode 100644
index e707198..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/POJOEnrichmentOperator.java
+++ /dev/null
@@ -1,185 +0,0 @@
-package com.datatorrent.contrib.enrichment;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.lib.util.PojoUtils;
-import com.datatorrent.lib.util.PojoUtils.Getter;
-import com.datatorrent.lib.util.PojoUtils.Setter;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-import com.esotericsoftware.kryo.NotNull;
-import org.apache.commons.lang3.ClassUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- * This class takes a POJO as input and extract the value of the lookupKey configured
- * for this operator. It then does a lookup in file/DB to find matching entry and all key-value pairs
- * specified in the file/DB or based on include fieldMap are added to original tuple.
- *
- * Properties:<br>
- * <b>inputClass</b>: Class to be loaded for the incoming data type<br>
- * <b>outputClass</b>: Class to be loaded for the emitted data type<br>
- * <br>
- *
- * Example
- * The file contains data in json format, one entry per line. during setup entire file is read and
- * kept in memory for quick lookup.
- * If file contains following lines, and operator is configured with lookup key "productId"
- * { "productId": 1, "productCategory": 3 }
- * { "productId": 4, "productCategory": 10 }
- * { "productId": 3, "productCategory": 1 }
- *
- * And input tuple is
- * { amount=10.0, channelId=4, productId=3 }
- *
- * The tuple is modified as below before operator emits it on output port.
- * { amount=10.0, channelId=4, productId=3, productCategory=1 }
- *
- * @displayName BeanEnrichment
- * @category Database
- * @tags enrichment, lookup
- *
- * @since 2.1.0
- */
-public class POJOEnrichmentOperator extends AbstractEnrichmentOperator<Object, Object> {
-
- private transient static final Logger logger = LoggerFactory.getLogger(POJOEnrichmentOperator.class);
- protected Class inputClass;
- protected Class outputClass;
- private transient List<Getter> getters = new LinkedList<Getter>();
- private transient List<FieldObjectMap> fieldMap = new LinkedList<FieldObjectMap>();
- private transient List<Setter> updateSetter = new LinkedList<Setter>();
-
- @NotNull
- protected String outputClassStr;
-
-
- @Override
- protected Object getKey(Object tuple) {
- ArrayList<Object> keyList = new ArrayList<Object>();
- for(Getter g : getters) {
- keyList.add(g.get(tuple));
- }
- return keyList;
- }
-
- @Override
- protected Object convert(Object in, Object cached) {
- try {
- Object o = outputClass.newInstance();
-
- // Copy the fields from input to output
- for (FieldObjectMap map : fieldMap) {
- map.set.set(o, map.get.get(in));
- }
-
- if (cached == null)
- return o;
-
- if(updateSetter.size() == 0 && includeFields.size() != 0) {
- populateUpdatesFrmIncludeFields();
- }
- ArrayList<Object> newAttributes = (ArrayList<Object>)cached;
- int idx = 0;
- for(Setter s: updateSetter) {
- s.set(o, newAttributes.get(idx));
- idx++;
- }
- return o;
- } catch (InstantiationException e) {
- throw new RuntimeException(e);
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void setup(Context.OperatorContext context) {
- super.setup(context);
- populateUpdatesFrmIncludeFields();
- }
-
- private void populateGettersFrmLookup()
- {
- for (String fName : lookupFields) {
- Getter f = PojoUtils.createGetter(inputClass, fName, Object.class);
- getters.add(f);
- }
- }
-
- private void populateGettersFrmInput()
- {
- Field[] fields = inputClass.getFields();
- for (Field f : fields) {
- Class c = ClassUtils.primitiveToWrapper(f.getType());
- FieldObjectMap fieldMap = new FieldObjectMap();
- fieldMap.get = PojoUtils.createGetter(inputClass, f.getName(), c);
- try {
- fieldMap.set = PojoUtils.createSetter(outputClass, f.getName(), c);
- } catch (Throwable e) {
- throw new RuntimeException("Failed to initialize Output Class for field: " + f.getName(), e);
- }
- this.fieldMap.add(fieldMap);
- }
- }
-
- private void populateUpdatesFrmIncludeFields() {
- if (this.outputClass == null) {
- logger.debug("Creating output class instance from string: {}", outputClassStr);
- try {
- this.outputClass = this.getClass().getClassLoader().loadClass(outputClassStr);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e);
- }
- }
-
- for (String fName : includeFields) {
- try {
- Field f = outputClass.getField(fName);
- Class c;
- if(f.getType().isPrimitive()) {
- c = ClassUtils.primitiveToWrapper(f.getType());
- } else {
- c = f.getType();
- }
- try {
- updateSetter.add(PojoUtils.createSetter(outputClass, f.getName(), c));
- } catch (Throwable e) {
- throw new RuntimeException("Failed to initialize Output Class for field: " + f.getName(), e);
- }
- } catch (NoSuchFieldException e) {
- throw new RuntimeException("Cannot find field '" + fName + "' in output class", e);
- }
- }
- }
-
- public String getOutputClassStr()
- {
- return outputClassStr;
- }
-
- public void setOutputClassStr(String outputClassStr)
- {
- this.outputClassStr = outputClassStr;
- }
-
- @Override protected void processTuple(Object tuple)
- {
- if (inputClass == null) {
- inputClass = tuple.getClass();
- populateGettersFrmLookup();
- populateGettersFrmInput();
- }
- super.processTuple(tuple);
- }
-
- private class FieldObjectMap
- {
- public Getter get;
- public Setter set;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/ReadOnlyBackup.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/ReadOnlyBackup.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/ReadOnlyBackup.java
deleted file mode 100644
index 3357704..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/ReadOnlyBackup.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Copyright (c) 2015 DataTorrent, Inc.
- * All rights reserved.
- */
-package com.datatorrent.contrib.enrichment;
-
-import java.util.List;
-import java.util.Map;
-/**
- * @since 3.1.0
- */
-
-public abstract class ReadOnlyBackup implements EnrichmentBackup
-{
- protected transient List<String> includeFields;
- protected transient List<String> lookupFields;
-
- @Override public void put(Object key, Object value)
- {
- throw new RuntimeException("Not supported operation");
- }
-
- @Override public void putAll(Map<Object, Object> m)
- {
- throw new RuntimeException("Not supported operation");
- }
-
- @Override public void remove(Object key)
- {
- throw new RuntimeException("Not supported operation");
- }
-
- @Override public void setFields(List<String> lookupFields,List<String> includeFields)
- {
- this.includeFields = includeFields;
- this.lookupFields = lookupFields;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/package-info.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/package-info.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/package-info.java
deleted file mode 100644
index 7d5b4cd..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/package-info.java
+++ /dev/null
@@ -1 +0,0 @@
-package com.datatorrent.contrib.enrichment;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrich/EmployeeOrder.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrich/EmployeeOrder.java b/contrib/src/test/java/com/datatorrent/contrib/enrich/EmployeeOrder.java
new file mode 100644
index 0000000..6015435
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/enrich/EmployeeOrder.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+public class EmployeeOrder
+{
+ public int OID;
+ public int ID;
+ public double amount;
+ public String NAME;
+ public int AGE;
+ public String ADDRESS;
+ public double SALARY;
+
+ public int getOID()
+ {
+ return OID;
+ }
+
+ public void setOID(int OID)
+ {
+ this.OID = OID;
+ }
+
+ public int getID()
+ {
+ return ID;
+ }
+
+ public void setID(int ID)
+ {
+ this.ID = ID;
+ }
+
+ public int getAGE()
+ {
+ return AGE;
+ }
+
+ public void setAGE(int AGE)
+ {
+ this.AGE = AGE;
+ }
+
+ public String getNAME()
+ {
+ return NAME;
+ }
+
+ public void setNAME(String NAME)
+ {
+ this.NAME = NAME;
+ }
+
+ public double getAmount()
+ {
+ return amount;
+ }
+
+ public void setAmount(double amount)
+ {
+ this.amount = amount;
+ }
+
+ public String getADDRESS()
+ {
+ return ADDRESS;
+ }
+
+ public void setADDRESS(String ADDRESS)
+ {
+ this.ADDRESS = ADDRESS;
+ }
+
+ public double getSALARY()
+ {
+ return SALARY;
+ }
+
+ public void setSALARY(double SALARY)
+ {
+ this.SALARY = SALARY;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "{" +
+ "OID=" + OID +
+ ", ID=" + ID +
+ ", amount=" + amount +
+ ", NAME='" + NAME + '\'' +
+ ", AGE=" + AGE +
+ ", ADDRESS='" + ADDRESS.trim() + '\'' +
+ ", SALARY=" + SALARY +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java
new file mode 100644
index 0000000..f24a13c
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.commons.io.FileUtils;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.google.common.collect.Maps;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+public class FileEnrichmentTest
+{
+
+ @Rule
+ public final TestUtils.TestInfo testInfo = new TestUtils.TestInfo();
+
+ @Test
+ public void testEnrichmentOperator() throws IOException, InterruptedException
+ {
+ URL origUrl = this.getClass().getResource("/productmapping.txt");
+
+ URL fileUrl = new URL(this.getClass().getResource("/").toString() + "productmapping1.txt");
+ FileUtils.deleteQuietly(new File(fileUrl.getPath()));
+ FileUtils.copyFile(new File(origUrl.getPath()), new File(fileUrl.getPath()));
+
+ MapEnricher oper = new MapEnricher();
+ FSLoader store = new FSLoader();
+ store.setFileName(fileUrl.toString());
+ oper.setLookupFields(Arrays.asList("productId"));
+ oper.setIncludeFields(Arrays.asList("productCategory"));
+ oper.setStore(store);
+
+ oper.setup(null);
+
+ /* File contains 6 entries, but operator one entry is duplicate,
+ * so cache should contains only 5 entries after scanning input file.
+ */
+ //Assert.assertEquals("Number of mappings ", 7, oper.cache.size());
+
+ CollectorTestSink<Map<String, Object>> sink = new CollectorTestSink<>();
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ CollectorTestSink<Object> tmp = (CollectorTestSink)sink;
+ oper.output.setSink(tmp);
+
+ oper.activate(null);
+
+ oper.beginWindow(0);
+ Map<String, Object> tuple = Maps.newHashMap();
+ tuple.put("productId", 3);
+ tuple.put("channelId", 4);
+ tuple.put("amount", 10.0);
+
+ Kryo kryo = new Kryo();
+ oper.input.process(kryo.copy(tuple));
+
+ oper.endWindow();
+
+ oper.deactivate();
+
+ /* Number of tuple, emitted */
+ Assert.assertEquals("Number of tuple emitted ", 1, sink.collectedTuples.size());
+ Map<String, Object> emitted = sink.collectedTuples.iterator().next();
+
+ /* The fields present in original event is kept as it is */
+ Assert.assertEquals("Number of fields in emitted tuple", 4, emitted.size());
+ Assert.assertEquals("value of productId is 3", tuple.get("productId"), emitted.get("productId"));
+ Assert.assertEquals("value of channelId is 4", tuple.get("channelId"), emitted.get("channelId"));
+ Assert.assertEquals("value of amount is 10.0", tuple.get("amount"), emitted.get("amount"));
+
+ /* Check if productCategory is added to the event */
+ Assert.assertEquals("productCategory is part of tuple", true, emitted.containsKey("productCategory"));
+ Assert.assertEquals("value of product category is 1", 5, emitted.get("productCategory"));
+ Assert.assertTrue(emitted.get("productCategory") instanceof Integer);
+ }
+}
+