You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by chinmaykolhatkar <gi...@git.apache.org> on 2016/04/06 19:03:24 UTC

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

GitHub user chinmaykolhatkar opened a pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235

    APEXMALHAR-2023 Enricher

    POJO and Map Enricher along with File and JDBC loader.
    
    @tushargosavi Please review

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/chinmaykolhatkar/incubator-apex-malhar APEXMALHAR-2023_Enrichment

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-apex-malhar/pull/235.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #235
    
----
commit 1744ea5d6f4988cc456ef21c030c60affc8d5ff3
Author: Chaitanya <ch...@datatorrent.com>
Date:   2016-03-29T09:16:17Z

    APEXMALHAR-2023 Adding Enrichment Operator to Malhar
    
    Developed Map and Bean Enrichment Operators
    Developed JDBC Loader
    Developed HBase Loader

commit 8ac53bd238b5091604571186857cb7c44e35b738
Author: chinmaykolhatkar <ch...@datatorrent.com>
Date:   2016-03-29T09:16:54Z

    APEXMALHAR-2023 Adding Enrichment Operator to Malhar
    
    Added 2 operators POJOEnricher and MapEnricher which enriches the given POJO or map as configured.
    The operators are marked evolving.
    Test cases added for both operators.
    
    2 Backend loaders are added for File and JDBC.
    Hbase loader removed as there are some things that needs to be discussed. Postponed the implementation for after discussion.
    
    Added a new SupportType for OBJECT to FieldInfo.
    Added static method in FieldInfo for conversion to Class -> SupportedType.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60900129
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java ---
    @@ -0,0 +1,319 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.esotericsoftware.kryo.NotNull;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.lib.db.cache.CacheManager;
    +import com.datatorrent.lib.db.cache.CacheStore;
    +import com.datatorrent.lib.util.FieldInfo;
    +import com.datatorrent.lib.util.FieldInfo.SupportType;
    +
    +/**
    + * Base class for Enrichment Operator.&nbsp; Subclasses should provide implementation to getKey and convert.
    + * The operator receives a tuple and emits enriched tuple based on includeFields and lookupFields. <br/>
    + * <p>
    + * Properties:<br>
    + * <b>lookupFields</b>: List of comma separated keys for quick searching. Ex: Field1,Field2,Field3<br>
    + * <b>includeFields</b>: List of comma separated fields to be replaced/added to the input tuple. Ex: Field1,Field2,Field3<br>
    + * <b>store</b>: Specify the type of loader for looking data<br>
    + * <br>
    + *
    + * @param <INPUT>  Type of tuples which are received by this operator</T>
    + * @param <OUTPUT> Type of tuples which are emitted by this operator</T>
    + * @displayName Abstract Enrichment Operator
    + * @tags Enrichment
    + */
    +@InterfaceStability.Evolving
    +public abstract class AbstractEnricher<INPUT, OUTPUT> extends BaseOperator implements Operator.ActivationListener
    +{
    +  /**
    +   * Mandatory parameters for Enricher
    +   */
    +  @NotNull
    +  protected List<String> lookupFields;
    +  @NotNull
    +  protected List<String> includeFields;
    +  @NotNull
    +  private BackendLoader store;
    +
    +  /**
    +   * Optional parameters for enricher.
    +   */
    +  private int cacheExpirationInterval = 24 * 60 * 60 * 1000;
    +  private int cacheCleanupInterval = 24 * 60 * 60 * 1000;
    +  private int cacheSize = 1024;
    +
    +  /**
    +   * Helper variables.
    +   */
    +  private transient CacheManager cacheManager;
    +  protected transient List<FieldInfo> lookupFieldInfo = new ArrayList<>();
    +  protected transient List<FieldInfo> includeFieldInfo = new ArrayList<>();
    +
    +  /**
    +   * This method needs to be called by implementing class for processing a tuple for enrichment.
    +   * The method will take th tuple through following stages:
    +   * <ol>
    +   * <li>Call {@link #getKey(Object)} to retrieve key fields for lookup</li>
    +   * <li>Using key fields call cache manager to retrieve for any key that is cached already</li>
    +   * <li>If not found in cache, it'll do a lookup in configured backend store</li>
    +   * <li>The retrieved fields will be passed to {@link #convert(Object, Object)} method to create the final object</li>
    +   * <li>Finally {@link #emitTuple(Object)} is called for emitting the tuple</li>
    +   * </ol>
    +   *
    --- End diff --
    
    Will have an error port on impl classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60208005
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java ---
    @@ -0,0 +1,319 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.esotericsoftware.kryo.NotNull;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.lib.db.cache.CacheManager;
    +import com.datatorrent.lib.db.cache.CacheStore;
    +import com.datatorrent.lib.util.FieldInfo;
    +import com.datatorrent.lib.util.FieldInfo.SupportType;
    +
    +/**
    + * Base class for Enrichment Operator.&nbsp; Subclasses should provide implementation to getKey and convert.
    + * The operator receives a tuple and emits enriched tuple based on includeFields and lookupFields. <br/>
    + * <p>
    + * Properties:<br>
    + * <b>lookupFields</b>: List of comma separated keys for quick searching. Ex: Field1,Field2,Field3<br>
    + * <b>includeFields</b>: List of comma separated fields to be replaced/added to the input tuple. Ex: Field1,Field2,Field3<br>
    + * <b>store</b>: Specify the type of loader for looking data<br>
    + * <br>
    + *
    + * @param <INPUT>  Type of tuples which are received by this operator</T>
    + * @param <OUTPUT> Type of tuples which are emitted by this operator</T>
    + * @displayName Abstract Enrichment Operator
    + * @tags Enrichment
    + */
    +@InterfaceStability.Evolving
    +public abstract class AbstractEnricher<INPUT, OUTPUT> extends BaseOperator implements Operator.ActivationListener
    +{
    +  /**
    +   * Mandatory parameters for Enricher
    +   */
    +  @NotNull
    +  protected List<String> lookupFields;
    +  @NotNull
    +  protected List<String> includeFields;
    +  @NotNull
    +  private BackendLoader store;
    +
    +  /**
    +   * Optional parameters for enricher.
    +   */
    +  private int cacheExpirationInterval = 24 * 60 * 60 * 1000;
    +  private int cacheCleanupInterval = 24 * 60 * 60 * 1000;
    +  private int cacheSize = 1024;
    --- End diff --
    
    Very difficult to come up with a default value here. Please make sure this value is explained well in the jovadocs ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60205647
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java ---
    @@ -0,0 +1,319 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.esotericsoftware.kryo.NotNull;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.lib.db.cache.CacheManager;
    +import com.datatorrent.lib.db.cache.CacheStore;
    +import com.datatorrent.lib.util.FieldInfo;
    +import com.datatorrent.lib.util.FieldInfo.SupportType;
    +
    +/**
    + * Base class for Enrichment Operator.&nbsp; Subclasses should provide implementation to getKey and convert.
    + * The operator receives a tuple and emits enriched tuple based on includeFields and lookupFields. <br/>
    + * <p>
    + * Properties:<br>
    + * <b>lookupFields</b>: List of comma separated keys for quick searching. Ex: Field1,Field2,Field3<br>
    + * <b>includeFields</b>: List of comma separated fields to be replaced/added to the input tuple. Ex: Field1,Field2,Field3<br>
    + * <b>store</b>: Specify the type of loader for looking data<br>
    + * <br>
    + *
    + * @param <INPUT>  Type of tuples which are received by this operator</T>
    + * @param <OUTPUT> Type of tuples which are emitted by this operator</T>
    + * @displayName Abstract Enrichment Operator
    + * @tags Enrichment
    + */
    +@InterfaceStability.Evolving
    +public abstract class AbstractEnricher<INPUT, OUTPUT> extends BaseOperator implements Operator.ActivationListener
    +{
    +  /**
    +   * Mandatory parameters for Enricher
    +   */
    +  @NotNull
    +  protected List<String> lookupFields;
    +  @NotNull
    +  protected List<String> includeFields;
    +  @NotNull
    +  private BackendLoader store;
    +
    +  /**
    +   * Optional parameters for enricher.
    +   */
    +  private int cacheExpirationInterval = 24 * 60 * 60 * 1000;
    --- End diff --
    
    How much is that? Please write that in comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-apex-malhar/pull/235


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60208711
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java ---
    @@ -0,0 +1,319 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.esotericsoftware.kryo.NotNull;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.lib.db.cache.CacheManager;
    +import com.datatorrent.lib.db.cache.CacheStore;
    +import com.datatorrent.lib.util.FieldInfo;
    +import com.datatorrent.lib.util.FieldInfo.SupportType;
    +
    +/**
    + * Base class for Enrichment Operator.&nbsp; Subclasses should provide implementation to getKey and convert.
    + * The operator receives a tuple and emits enriched tuple based on includeFields and lookupFields. <br/>
    + * <p>
    + * Properties:<br>
    + * <b>lookupFields</b>: List of comma separated keys for quick searching. Ex: Field1,Field2,Field3<br>
    + * <b>includeFields</b>: List of comma separated fields to be replaced/added to the input tuple. Ex: Field1,Field2,Field3<br>
    + * <b>store</b>: Specify the type of loader for looking data<br>
    + * <br>
    + *
    + * @param <INPUT>  Type of tuples which are received by this operator</T>
    + * @param <OUTPUT> Type of tuples which are emitted by this operator</T>
    + * @displayName Abstract Enrichment Operator
    + * @tags Enrichment
    + */
    +@InterfaceStability.Evolving
    +public abstract class AbstractEnricher<INPUT, OUTPUT> extends BaseOperator implements Operator.ActivationListener
    +{
    +  /**
    +   * Mandatory parameters for Enricher
    +   */
    +  @NotNull
    +  protected List<String> lookupFields;
    +  @NotNull
    +  protected List<String> includeFields;
    +  @NotNull
    +  private BackendLoader store;
    +
    +  /**
    +   * Optional parameters for enricher.
    +   */
    +  private int cacheExpirationInterval = 24 * 60 * 60 * 1000;
    +  private int cacheCleanupInterval = 24 * 60 * 60 * 1000;
    +  private int cacheSize = 1024;
    +
    +  /**
    +   * Helper variables.
    +   */
    +  private transient CacheManager cacheManager;
    +  protected transient List<FieldInfo> lookupFieldInfo = new ArrayList<>();
    +  protected transient List<FieldInfo> includeFieldInfo = new ArrayList<>();
    +
    +  /**
    +   * This method needs to be called by implementing class for processing a tuple for enrichment.
    +   * The method will take th tuple through following stages:
    +   * <ol>
    +   * <li>Call {@link #getKey(Object)} to retrieve key fields for lookup</li>
    +   * <li>Using key fields call cache manager to retrieve for any key that is cached already</li>
    +   * <li>If not found in cache, it'll do a lookup in configured backend store</li>
    +   * <li>The retrieved fields will be passed to {@link #convert(Object, Object)} method to create the final object</li>
    +   * <li>Finally {@link #emitTuple(Object)} is called for emitting the tuple</li>
    +   * </ol>
    +   *
    +   * @param tuple Input tuple that needs to get processed for enrichment.
    +   */
    +  protected void enrichTuple(INPUT tuple)
    +  {
    +    Object key = getKey(tuple);
    +    if (key != null) {
    +      Object result = cacheManager.get(key);
    +      OUTPUT out = convert(tuple, result);
    +      emitTuple(out);
    --- End diff --
    
    Can you rename it to something else as it will confuse with emitTuple of input adapter.
    Suggestion: emitEnrichedTuple()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60872275
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/ReadOnlyBackup.java ---
    @@ -0,0 +1,61 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.datatorrent.lib.util.FieldInfo;
    +
    +/**
    + * ReadOnly abstract implementation of BackendLoader.
    + */
    +@InterfaceStability.Evolving
    +public abstract class ReadOnlyBackup implements BackendLoader
    +{
    +  protected transient List<FieldInfo> includeFieldInfo;
    +  protected transient List<FieldInfo> lookupFieldInfo;
    +
    +  @Override
    +  public void put(Object key, Object value)
    +  {
    +    throw new RuntimeException("Not supported operation");
    --- End diff --
    
    UnsupportedOperationException.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60870827
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/POJOEnricher.java ---
    @@ -0,0 +1,264 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
    +import com.datatorrent.lib.util.FieldInfo;
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +
    +/**
    + * This class takes a POJO as input and extract the value of the lookupKey configured
    + * for this operator. It then does a lookup in file/DB to find matching entry and all key-value pairs
    + * specified in the file/DB or based on include fieldMap are added to original tuple.
    + * This operator is App Builder schema support enabled. <br>
    + * <p>
    + * Properties:<br>
    + * <b>inputClass</b>: Class to be loaded for the incoming data type<br>
    + * <b>outputClass</b>: Class to be loaded for the emitted data type<br>
    + * <br>
    + * <p>
    + * Example
    + * The file contains data in json format, one entry per line. during setup entire file is read and
    + * kept in memory for quick lookup.
    + * If file contains following lines, and operator is configured with lookup key "productId"
    + * { "productId": 1, "productCategory": 3 }
    + * { "productId": 4, "productCategory": 10 }
    + * { "productId": 3, "productCategory": 1 }
    + * <p>
    + * And input tuple is
    + * { amount=10.0, channelId=4, productId=3 }
    + * <p>
    + * The tuple is modified as below before operator emits it on output port.
    + * { amount=10.0, channelId=4, productId=3, productCategory=1 }
    + *
    + * @displayName BeanEnrichment
    + * @category Database
    + * @tags enrichment, pojo, schema, lookup
    + */
    +@InterfaceStability.Evolving
    +public class POJOEnricher extends AbstractEnricher<Object, Object>
    +{
    +  private static final Logger logger = LoggerFactory.getLogger(POJOEnricher.class);
    +
    +  /**
    +   * Helper fields
    +   */
    +  protected Class<?> inputClass;
    +  protected Class<?> outputClass;
    +  private transient Map<PojoUtils.Getter, PojoUtils.Setter> fieldMap = new HashMap<>();
    +  private transient List<PojoUtils.Setter> includeSetters = new ArrayList<>();
    +  private transient List<PojoUtils.Getter> lookupGetters = new ArrayList<>();
    +
    +  @InputPortFieldAnnotation(schemaRequired = true)
    +  public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
    +  {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      inputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
    +    }
    +
    +    @Override
    +    public void process(Object object)
    +    {
    +      processTuple(object);
    +    }
    +  };
    +
    +  @OutputPortFieldAnnotation(schemaRequired = true)
    +  public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>()
    +  {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
    +    }
    +  };
    +
    +  protected void processTuple(Object object)
    +  {
    +    enrichTuple(object);
    +  }
    +
    +  @Override
    +  protected Object getKey(Object tuple)
    +  {
    +    ArrayList<Object> keyList = new ArrayList<>();
    +    for (PojoUtils.Getter lookupGetter : lookupGetters) {
    +      keyList.add(lookupGetter.get(tuple));
    +    }
    +    return keyList;
    +  }
    +
    +  @Override
    +  protected Object convert(Object in, Object cached)
    +  {
    +    Object o;
    +
    +    try {
    +      o = outputClass.newInstance();
    +    } catch (InstantiationException | IllegalAccessException e) {
    +      logger.error("Failed to create new instance of output POJO", e);
    +      return null;
    --- End diff --
    
    Can the input tuple be redirected to error port?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60867383
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java ---
    @@ -0,0 +1,181 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.codehaus.jackson.JsonProcessingException;
    +import org.codehaus.jackson.map.ObjectMapper;
    +import org.codehaus.jackson.map.ObjectReader;
    +import org.codehaus.jackson.type.TypeReference;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.commons.io.IOUtils;
    +import org.apache.hadoop.classification.InterfaceStability;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +
    +import com.esotericsoftware.kryo.NotNull;
    +import com.google.common.collect.Maps;
    +import com.datatorrent.lib.util.FieldInfo;
    +
    +
    +/**
    + * This implementation of {@link BackendLoader} loads the data from given file, puts in memory cache and serves the
    + * queries from the same cache.
    + * When this becomes part of cache manager, it can call {@link #loadInitialData()} periodically reload the file.
    + * <p>
    + * The format of the file looks like following:
    + * <p>
    + * {"productCategory": 5, "productId": 0}
    + * {"productCategory": 4, "productId": 1}
    + * {"productCategory": 5, "productId": 2}
    + * {"productCategory": 5, "productId": 3}
    + * </p>
    + * Each line in a file should be a valid json object which represent the record and each key/value pair in that json object
    + * represents the fields/value.
    + * <p>
    + * NOTE: This loader should be used with caution as all the data present in the file is loaded in memory.
    + */
    +@InterfaceStability.Evolving
    +public class FSLoader extends ReadOnlyBackup
    +{
    +  @NotNull
    +  private String fileName;
    +
    +  private transient Path filePath;
    +  private transient FileSystem fs;
    +  private transient boolean connected;
    +
    +  private static final ObjectMapper mapper = new ObjectMapper();
    +  private static final ObjectReader reader = mapper.reader(new TypeReference<Map<String, Object>>(){});
    +  private static final Logger logger = LoggerFactory.getLogger(FSLoader.class);
    +
    +  public String getFileName()
    +  {
    +    return fileName;
    +  }
    +
    +  public void setFileName(String fileName)
    +  {
    +    this.fileName = fileName;
    +  }
    +
    +  @Override
    +  public Map<Object, Object> loadInitialData()
    +  {
    +    Map<Object, Object> result = null;
    +    FSDataInputStream in = null;
    +    BufferedReader bin = null;
    +    try {
    +      result = Maps.newHashMap();
    +      in = fs.open(filePath);
    +      bin = new BufferedReader(new InputStreamReader(in));
    +      String line;
    +      while ((line = bin.readLine()) != null) {
    +        try {
    +          Map<String, Object> tuple = reader.readValue(line);
    +          result.put(getKey(tuple), getValue(tuple));
    +        } catch (JsonProcessingException parseExp) {
    +          logger.info("Unable to parse line {}", line);
    +        }
    +      }
    +    } catch (IOException e) {
    +      throw new RuntimeException(e);
    +    } finally {
    +      if (bin != null) {
    +        IOUtils.closeQuietly(bin);
    +      }
    +      if (in != null) {
    +        IOUtils.closeQuietly(in);
    +      }
    +      try {
    +        fs.close();
    +      } catch (IOException e) {
    +        throw new RuntimeException(e);
    +      }
    +    }
    +    logger.debug("loading initial data {}", result.size());
    +    return result;
    +  }
    +
    +  private Object getValue(Map<String, Object> tuple)
    +  {
    +    ArrayList<Object> includeTuple = new ArrayList<Object>();
    +    for (FieldInfo s : includeFieldInfo) {
    +      includeTuple.add(tuple.get(s.getColumnName()));
    +    }
    +    return includeTuple;
    +  }
    +
    +  private Object getKey(Map<String, Object> tuple)
    +  {
    +    ArrayList<Object> lst = new ArrayList<Object>();
    +    for (FieldInfo key : lookupFieldInfo) {
    +      lst.add(tuple.get(key.getColumnName()));
    +    }
    +    return lst;
    +  }
    +
    +  @Override
    +  public Object get(Object key)
    +  {
    +    return null;
    +  }
    +
    +  @Override
    +  public List<Object> getAll(List<Object> keys)
    +  {
    +    return null;
    +  }
    +
    +  @Override
    +  public void connect() throws IOException
    +  {
    +    Configuration conf = new Configuration();
    +    this.filePath = new Path(fileName);
    +    this.fs = FileSystem.newInstance(filePath.toUri(), conf);
    --- End diff --
    
    How frequently you are refreshing your cache? Good to also check if the file is updated since last refresh and then only load the data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60210490
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java ---
    @@ -0,0 +1,319 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.esotericsoftware.kryo.NotNull;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.lib.db.cache.CacheManager;
    +import com.datatorrent.lib.db.cache.CacheStore;
    +import com.datatorrent.lib.util.FieldInfo;
    +import com.datatorrent.lib.util.FieldInfo.SupportType;
    +
    +/**
    + * Base class for Enrichment Operator.&nbsp; Subclasses should provide implementation to getKey and convert.
    + * The operator receives a tuple and emits enriched tuple based on includeFields and lookupFields. <br/>
    + * <p>
    + * Properties:<br>
    + * <b>lookupFields</b>: List of comma separated keys for quick searching. Ex: Field1,Field2,Field3<br>
    + * <b>includeFields</b>: List of comma separated fields to be replaced/added to the input tuple. Ex: Field1,Field2,Field3<br>
    + * <b>store</b>: Specify the type of loader for looking data<br>
    + * <br>
    + *
    + * @param <INPUT>  Type of tuples which are received by this operator</T>
    + * @param <OUTPUT> Type of tuples which are emitted by this operator</T>
    + * @displayName Abstract Enrichment Operator
    + * @tags Enrichment
    + */
    +@InterfaceStability.Evolving
    +public abstract class AbstractEnricher<INPUT, OUTPUT> extends BaseOperator implements Operator.ActivationListener
    +{
    +  /**
    +   * Mandatory parameters for Enricher
    +   */
    +  @NotNull
    +  protected List<String> lookupFields;
    +  @NotNull
    +  protected List<String> includeFields;
    +  @NotNull
    +  private BackendLoader store;
    +
    +  /**
    +   * Optional parameters for enricher.
    +   */
    +  private int cacheExpirationInterval = 24 * 60 * 60 * 1000;
    +  private int cacheCleanupInterval = 24 * 60 * 60 * 1000;
    +  private int cacheSize = 1024;
    +
    +  /**
    +   * Helper variables.
    +   */
    +  private transient CacheManager cacheManager;
    +  protected transient List<FieldInfo> lookupFieldInfo = new ArrayList<>();
    +  protected transient List<FieldInfo> includeFieldInfo = new ArrayList<>();
    +
    +  /**
    +   * This method needs to be called by implementing class for processing a tuple for enrichment.
    +   * The method will take th tuple through following stages:
    +   * <ol>
    +   * <li>Call {@link #getKey(Object)} to retrieve key fields for lookup</li>
    +   * <li>Using key fields call cache manager to retrieve for any key that is cached already</li>
    +   * <li>If not found in cache, it'll do a lookup in configured backend store</li>
    +   * <li>The retrieved fields will be passed to {@link #convert(Object, Object)} method to create the final object</li>
    +   * <li>Finally {@link #emitTuple(Object)} is called for emitting the tuple</li>
    +   * </ol>
    +   *
    +   * @param tuple Input tuple that needs to get processed for enrichment.
    +   */
    +  protected void enrichTuple(INPUT tuple)
    +  {
    +    Object key = getKey(tuple);
    +    if (key != null) {
    +      Object result = cacheManager.get(key);
    +      OUTPUT out = convert(tuple, result);
    +      emitTuple(out);
    +    }
    +  }
    +
    +  /**
    +   * The method should be implemented by concrete class which returns an ArrayList<Object> containing all the fields
    +   * which forms key part of lookup.
    +   * The order of field values should be same as the one set in {@link #lookupFields} variable.
    +   *
    +   * @param tuple Input tuple from which fields values for key needs to be fetched.
    +   * @return Should return ArrayList<Object> which has fields values forming keys in same order as {@link #lookupFields}
    +   */
    +  protected abstract Object getKey(INPUT tuple);
    +
    +  /**
    +   * The method should be implemented by concrete class.
    +   * This method is expected to take input tuple and a externally fetched object contained field to be enriched, and
    +   * return a Enriched tuple which is ready to be emitted.
    +   *
    +   * @param in     Input tuple which needs to be enriched.
    +   * @param cached ArrayList<Object> containing missing data retrieved from external sources.
    +   * @return Enriched tuple of type OUTPUT
    +   */
    +  protected abstract OUTPUT convert(INPUT in, Object cached);
    +
    +  /**
    +   * This method should be implemented by concrete class.
    +   * The method is expected to emit tuple of type OUTPUT
    +   *
    +   * @param tuple Tuple of type OUTPUT that should be emitted.
    +   */
    +  protected abstract void emitTuple(OUTPUT tuple);
    +
    +  /**
    +   * This method should be implemented by concrete method.
    +   * The method should return Class type of field for given fieldName from output tuple.
    +   *
    +   * @param fieldName Field name for which field type needs to be identified
    +   * @return Class type for given field.
    +   */
    +  protected abstract Class<?> getIncludeFieldType(String fieldName);
    +
    +  /**
    +   * This method should be implemented by concrete method.
    +   * The method should return Class type of field for given fieldName from input tuple.
    +   *
    +   * @param fieldName Field name for which field type needs to be identified
    +   * @return Class type for given field.
    +   */
    +  protected abstract Class<?> getLookupFieldType(String fieldName);
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    super.setup(context);
    +
    +    cacheManager = new NullValuesCacheManager();
    +    CacheStore primaryCache = new CacheStore();
    +
    +    // set expiration to one day.
    +    primaryCache.setEntryExpiryDurationInMillis(cacheExpirationInterval);
    +    primaryCache.setCacheCleanupInMillis(cacheCleanupInterval);
    +    primaryCache.setEntryExpiryStrategy(CacheStore.ExpiryType.EXPIRE_AFTER_WRITE);
    +    primaryCache.setMaxCacheSize(cacheSize);
    +
    +    cacheManager.setPrimary(primaryCache);
    +    cacheManager.setBackup(store);
    +  }
    +
    +  @Override
    +  public void activate(Context context)
    +  {
    +    for (String s : lookupFields) {
    +      lookupFieldInfo.add(new FieldInfo(s, s, SupportType.getFromJavaType(getLookupFieldType(s))));
    +    }
    +
    +    if (includeFields != null) {
    +      for (String s : includeFields) {
    +        includeFieldInfo.add(new FieldInfo(s, s, SupportType.getFromJavaType(getIncludeFieldType(s))));
    +      }
    +    }
    +
    +    store.setFieldInfo(lookupFieldInfo, includeFieldInfo);
    +
    +    try {
    +      cacheManager.initialize();
    +    } catch (IOException e) {
    +      throw new RuntimeException("Unable to initialize primary cache", e);
    +    }
    +  }
    +
    +  @Override
    +  public void deactivate()
    +  {
    --- End diff --
    
    Close any resources, like db connection for the cacheManager


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60869315
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/MapEnricher.java ---
    @@ -0,0 +1,150 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.lib.util.FieldInfo;
    +
    +/**
    + * This class takes a HashMap tuple as input and extract the value of the lookupKey configured
    + * for this operator. It then does a lookup in file/DB to find matching entry and all key-value pairs
    + * specified in the file/DB or based on include fields are added to original tuple.
    + * <p>
    + * Example
    + * The file contains data in json format, one entry per line. during setup entire file is read and
    + * kept in memory for quick lookup.
    + * If file contains following lines, and operator is configured with lookup key "productId"
    + * <p>
    + * { "productId": 1, "productCategory": 3 }
    + * { "productId": 4, "productCategory": 10 }
    + * { "productId": 3, "productCategory": 1 }
    + * </p>
    + * And input tuple is
    + * <p>
    + * { amount=10.0, channelId=4, productId=3 }
    + * </p>
    + * The tuple is modified as below before operator emits it on output port.
    + * <p>
    + * { amount=10.0, channelId=4, productId=3, productCategory=1 }
    + * </p>
    + *
    + * @displayName MapEnrichment
    + * @category Database
    + * @tags enrichment, lookup, map
    + */
    +@InterfaceStability.Evolving
    +public class MapEnricher extends AbstractEnricher<Map<String, Object>, Map<String, Object>>
    +{
    +  public final transient DefaultInputPort<Map<String, Object>> input = new DefaultInputPort<Map<String, Object>>()
    +  {
    +    @Override
    +    public void process(Map<String, Object> obj)
    +    {
    +      processTuple(obj);
    +    }
    +  };
    +
    +  public final transient DefaultOutputPort<Map<String, Object>> output = new DefaultOutputPort<>();
    +
    +  private void processTuple(Map<String, Object> obj)
    +  {
    +    enrichTuple(obj);
    +  }
    +
    +  @Override
    +  protected Object getKey(Map<String, Object> tuple)
    +  {
    +    ArrayList<Object> keyList = new ArrayList<Object>();
    +
    +    for (FieldInfo fieldInfo : lookupFieldInfo) {
    +      keyList.add(tuple.get(fieldInfo.getColumnName()));
    +    }
    +
    +    return keyList;
    +  }
    +
    +  @Override
    +  protected Map<String, Object> convert(Map<String, Object> in, Object cached)
    +  {
    +    if (cached == null) {
    +      return in;
    +    }
    +
    +    ArrayList<Object> newAttributes = (ArrayList<Object>)cached;
    +    if (newAttributes != null) {
    +      for (int i = 0; i < includeFieldInfo.size(); i++) {
    +        in.put(includeFieldInfo.get(i).getColumnName(), newAttributes.get(i));
    +      }
    +    }
    +    return in;
    +  }
    +
    +  @Override
    +  protected void emitTuple(Map<String, Object> tuple)
    +  {
    +    output.emit(tuple);
    +  }
    +
    +  @Override
    +  protected Class<?> getIncludeFieldType(String fieldName)
    +  {
    +    return Object.class;
    +  }
    +
    +  @Override
    +  protected Class<?> getLookupFieldType(String fieldName)
    +  {
    +    return Object.class;
    +  }
    +
    +  /**
    +   * Set fields on which lookup needs to happen in external store.
    +   * This is a mandatory parameter to set.
    +   *
    +   * @param lookupFields List of fields on which lookup happens.
    +   * @description $[] Field which become part of lookup key
    +   * @useSchema $[] input.fields[].name
    +   */
    +  @Override
    +  public void setLookupFields(List<String> lookupFields)
    +  {
    +    super.setLookupFields(lookupFields);
    +  }
    +
    +  /**
    +   * Set fields on which lookup needs to happen in external store.
    --- End diff --
    
    Correct this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60902622
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/NullValuesCacheManager.java ---
    @@ -0,0 +1,60 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.datatorrent.lib.db.cache.CacheManager;
    +
    +/**
    + * Null Values Cache Manager. Using this NULL entries can be specified explicitly.
    + */
    +@InterfaceStability.Evolving
    +public class NullValuesCacheManager extends CacheManager
    +{
    +
    +  private static final NullObject NULL = new NullObject();
    --- End diff --
    
    This ensures that if there is any special significance of "null" in any database, it is recognized.. What NULL means here is not found!!! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60208514
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java ---
    @@ -0,0 +1,319 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.esotericsoftware.kryo.NotNull;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.lib.db.cache.CacheManager;
    +import com.datatorrent.lib.db.cache.CacheStore;
    +import com.datatorrent.lib.util.FieldInfo;
    +import com.datatorrent.lib.util.FieldInfo.SupportType;
    +
    +/**
    + * Base class for Enrichment Operator.&nbsp; Subclasses should provide implementation to getKey and convert.
    + * The operator receives a tuple and emits enriched tuple based on includeFields and lookupFields. <br/>
    + * <p>
    + * Properties:<br>
    + * <b>lookupFields</b>: List of comma separated keys for quick searching. Ex: Field1,Field2,Field3<br>
    + * <b>includeFields</b>: List of comma separated fields to be replaced/added to the input tuple. Ex: Field1,Field2,Field3<br>
    + * <b>store</b>: Specify the type of loader for looking data<br>
    + * <br>
    + *
    + * @param <INPUT>  Type of tuples which are received by this operator</T>
    + * @param <OUTPUT> Type of tuples which are emitted by this operator</T>
    + * @displayName Abstract Enrichment Operator
    + * @tags Enrichment
    + */
    +@InterfaceStability.Evolving
    +public abstract class AbstractEnricher<INPUT, OUTPUT> extends BaseOperator implements Operator.ActivationListener
    +{
    +  /**
    +   * Mandatory parameters for Enricher
    +   */
    +  @NotNull
    +  protected List<String> lookupFields;
    +  @NotNull
    +  protected List<String> includeFields;
    +  @NotNull
    +  private BackendLoader store;
    +
    +  /**
    +   * Optional parameters for enricher.
    +   */
    +  private int cacheExpirationInterval = 24 * 60 * 60 * 1000;
    +  private int cacheCleanupInterval = 24 * 60 * 60 * 1000;
    +  private int cacheSize = 1024;
    +
    +  /**
    +   * Helper variables.
    +   */
    +  private transient CacheManager cacheManager;
    +  protected transient List<FieldInfo> lookupFieldInfo = new ArrayList<>();
    +  protected transient List<FieldInfo> includeFieldInfo = new ArrayList<>();
    +
    +  /**
    +   * This method needs to be called by implementing class for processing a tuple for enrichment.
    +   * The method will take th tuple through following stages:
    +   * <ol>
    +   * <li>Call {@link #getKey(Object)} to retrieve key fields for lookup</li>
    +   * <li>Using key fields call cache manager to retrieve for any key that is cached already</li>
    +   * <li>If not found in cache, it'll do a lookup in configured backend store</li>
    +   * <li>The retrieved fields will be passed to {@link #convert(Object, Object)} method to create the final object</li>
    +   * <li>Finally {@link #emitTuple(Object)} is called for emitting the tuple</li>
    +   * </ol>
    +   *
    --- End diff --
    
    Can have a metric for tuples which are not enriched.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60868038
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/JDBCLoader.java ---
    @@ -0,0 +1,201 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.ResultSetMetaData;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.commons.collections.CollectionUtils;
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.google.common.collect.Lists;
    +import com.datatorrent.lib.db.jdbc.JdbcStore;
    +import com.datatorrent.lib.util.FieldInfo;
    +
    +/**
    + * <p>HBaseLoader extends from {@link JdbcStore} uses JDBC to connect and implements BackendLoaders interface.</p> <br/>
    + * <p>
    + * Properties:<br>
    + * <b>queryStmt</b>: Sql Prepared Statement which needs to be executed<br>
    + * <b>tableName</b>: JDBC table name<br>
    + * <br>
    + */
    +@InterfaceStability.Evolving
    +public class JDBCLoader extends JdbcStore implements BackendLoader
    +{
    +  protected String queryStmt;
    +
    +  protected String tableName;
    +
    +  protected transient List<FieldInfo> includeFieldInfo;
    +  protected transient List<FieldInfo> lookupFieldInfo;
    +
    +  protected Object getQueryResult(Object key)
    +  {
    +    try {
    +      PreparedStatement getStatement = getConnection().prepareStatement(queryStmt);
    +      ArrayList<Object> keys = (ArrayList<Object>)key;
    +      for (int i = 0; i < keys.size(); i++) {
    +        getStatement.setObject(i + 1, keys.get(i));
    +      }
    +      return getStatement.executeQuery();
    +    } catch (SQLException e) {
    +      throw new RuntimeException(e);
    +    }
    +  }
    +
    +  protected ArrayList<Object> getDataFrmResult(Object result) throws RuntimeException
    +  {
    +    try {
    +      ResultSet resultSet = (ResultSet)result;
    +      if (resultSet.next()) {
    +        ResultSetMetaData rsdata = resultSet.getMetaData();
    +        // If the includefields is empty, populate it from ResultSetMetaData
    +        if (CollectionUtils.isEmpty(includeFieldInfo)) {
    +          if (includeFieldInfo == null) {
    +            includeFieldInfo = new ArrayList<>();
    +          }
    +          for (int i = 1; i <= rsdata.getColumnCount(); i++) {
    +            String columnName = rsdata.getColumnName(i);
    +            // TODO: Take care of type conversion.
    +            includeFieldInfo.add(new FieldInfo(columnName, columnName, FieldInfo.SupportType.OBJECT));
    +          }
    +        }
    +
    +        ArrayList<Object> res = new ArrayList<Object>();
    +        for (FieldInfo f : includeFieldInfo) {
    +          res.add(getConvertedData(resultSet.getObject(f.getColumnName()), f));
    +        }
    +        return res;
    +      } else {
    +        return null;
    +      }
    +    } catch (SQLException e) {
    +      throw new RuntimeException(e);
    +    }
    +  }
    +
    +  private Object getConvertedData(Object object, FieldInfo f)
    +  {
    +    if (f.getType().getJavaType() == object.getClass()) {
    +      return object;
    +    } else {
    +      logger.warn("Type mismatch seen for field {}, returning as it is", f.getColumnName());
    +      return null;
    +    }
    +  }
    +
    +  private String generateQueryStmt()
    +  {
    +    String stmt = "select * from " + tableName + " where ";
    +    boolean first = true;
    +    for (FieldInfo fieldInfo : lookupFieldInfo) {
    +      if (first) {
    +        first = false;
    +      } else {
    +        stmt += " and ";
    +      }
    +      stmt += fieldInfo.getColumnName() + " = ?";
    +    }
    +
    +    logger.info("generateQueryStmt: {}", stmt);
    +    return stmt;
    +  }
    +
    +  public String getQueryStmt()
    +  {
    +    return queryStmt;
    +  }
    +
    +  /**
    +   * Set the sql Prepared Statement if the enrichment mechanism is query based.
    +   */
    +  public void setQueryStmt(String queryStmt)
    +  {
    +    this.queryStmt = queryStmt;
    +  }
    +
    +  public String getTableName()
    +  {
    +    return tableName;
    +  }
    +
    +  /**
    +   * Set the table name.
    +   */
    +  public void setTableName(String tableName)
    +  {
    +    this.tableName = tableName;
    +  }
    +
    +  @Override
    +  public void setFieldInfo(List<FieldInfo> lookupFieldInfo, List<FieldInfo> includeFieldInfo)
    +  {
    +    this.lookupFieldInfo = lookupFieldInfo;
    +    this.includeFieldInfo = includeFieldInfo;
    +    if (queryStmt == null) {
    +      queryStmt = generateQueryStmt();
    +    }
    +  }
    +
    +  @Override
    +  public Map<Object, Object> loadInitialData()
    +  {
    +    return null;
    +  }
    +
    +  @Override
    +  public Object get(Object key)
    +  {
    +    return getDataFrmResult(getQueryResult(key));
    +  }
    +
    +  @Override
    +  public List<Object> getAll(List<Object> keys)
    +  {
    +    List<Object> values = Lists.newArrayList();
    +    for (Object key : keys) {
    +      values.add(get(key));
    +    }
    +    return values;
    +  }
    +
    +  @Override
    +  public void put(Object key, Object value)
    +  {
    +    throw new RuntimeException("Not supported operation");
    --- End diff --
    
    UnsupportedOperationException ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60207749
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java ---
    @@ -0,0 +1,319 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.esotericsoftware.kryo.NotNull;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.lib.db.cache.CacheManager;
    +import com.datatorrent.lib.db.cache.CacheStore;
    +import com.datatorrent.lib.util.FieldInfo;
    +import com.datatorrent.lib.util.FieldInfo.SupportType;
    +
    +/**
    + * Base class for Enrichment Operator.&nbsp; Subclasses should provide implementation to getKey and convert.
    + * The operator receives a tuple and emits enriched tuple based on includeFields and lookupFields. <br/>
    + * <p>
    + * Properties:<br>
    + * <b>lookupFields</b>: List of comma separated keys for quick searching. Ex: Field1,Field2,Field3<br>
    + * <b>includeFields</b>: List of comma separated fields to be replaced/added to the input tuple. Ex: Field1,Field2,Field3<br>
    + * <b>store</b>: Specify the type of loader for looking data<br>
    + * <br>
    + *
    + * @param <INPUT>  Type of tuples which are received by this operator</T>
    + * @param <OUTPUT> Type of tuples which are emitted by this operator</T>
    + * @displayName Abstract Enrichment Operator
    + * @tags Enrichment
    + */
    +@InterfaceStability.Evolving
    +public abstract class AbstractEnricher<INPUT, OUTPUT> extends BaseOperator implements Operator.ActivationListener
    +{
    +  /**
    +   * Mandatory parameters for Enricher
    +   */
    +  @NotNull
    +  protected List<String> lookupFields;
    +  @NotNull
    +  protected List<String> includeFields;
    +  @NotNull
    +  private BackendLoader store;
    +
    +  /**
    +   * Optional parameters for enricher.
    +   */
    +  private int cacheExpirationInterval = 24 * 60 * 60 * 1000;
    +  private int cacheCleanupInterval = 24 * 60 * 60 * 1000;
    --- End diff --
    
    Can we reduce that to 1hr or less?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r61614306
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java ---
    @@ -0,0 +1,181 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.codehaus.jackson.JsonProcessingException;
    +import org.codehaus.jackson.map.ObjectMapper;
    +import org.codehaus.jackson.map.ObjectReader;
    +import org.codehaus.jackson.type.TypeReference;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.commons.io.IOUtils;
    +import org.apache.hadoop.classification.InterfaceStability;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +
    +import com.esotericsoftware.kryo.NotNull;
    +import com.google.common.collect.Maps;
    +import com.datatorrent.lib.util.FieldInfo;
    +
    +
    +/**
    + * This implementation of {@link BackendLoader} loads the data from given file, puts in memory cache and serves the
    + * queries from the same cache.
    + * When this becomes part of cache manager, it can call {@link #loadInitialData()} periodically reload the file.
    + * <p>
    + * The format of the file looks like following:
    + * <p>
    + * {"productCategory": 5, "productId": 0}
    + * {"productCategory": 4, "productId": 1}
    + * {"productCategory": 5, "productId": 2}
    + * {"productCategory": 5, "productId": 3}
    + * </p>
    + * Each line in a file should be a valid json object which represent the record and each key/value pair in that json object
    + * represents the fields/value.
    + * <p>
    + * NOTE: This loader should be used with caution as all the data present in the file is loaded in memory.
    + */
    +@InterfaceStability.Evolving
    +public class FSLoader extends ReadOnlyBackup
    +{
    +  @NotNull
    +  private String fileName;
    +
    +  private transient Path filePath;
    +  private transient FileSystem fs;
    +  private transient boolean connected;
    +
    +  private static final ObjectMapper mapper = new ObjectMapper();
    +  private static final ObjectReader reader = mapper.reader(new TypeReference<Map<String, Object>>(){});
    +  private static final Logger logger = LoggerFactory.getLogger(FSLoader.class);
    +
    +  public String getFileName()
    +  {
    +    return fileName;
    +  }
    +
    +  public void setFileName(String fileName)
    +  {
    +    this.fileName = fileName;
    +  }
    +
    +  @Override
    +  public Map<Object, Object> loadInitialData()
    +  {
    +    Map<Object, Object> result = null;
    +    FSDataInputStream in = null;
    +    BufferedReader bin = null;
    +    try {
    +      result = Maps.newHashMap();
    +      in = fs.open(filePath);
    +      bin = new BufferedReader(new InputStreamReader(in));
    +      String line;
    +      while ((line = bin.readLine()) != null) {
    +        try {
    +          Map<String, Object> tuple = reader.readValue(line);
    +          result.put(getKey(tuple), getValue(tuple));
    +        } catch (JsonProcessingException parseExp) {
    +          logger.info("Unable to parse line {}", line);
    +        }
    +      }
    +    } catch (IOException e) {
    +      throw new RuntimeException(e);
    +    } finally {
    +      if (bin != null) {
    +        IOUtils.closeQuietly(bin);
    +      }
    +      if (in != null) {
    +        IOUtils.closeQuietly(in);
    +      }
    +      try {
    +        fs.close();
    +      } catch (IOException e) {
    +        throw new RuntimeException(e);
    +      }
    +    }
    +    logger.debug("loading initial data {}", result.size());
    +    return result;
    +  }
    +
    +  private Object getValue(Map<String, Object> tuple)
    +  {
    +    ArrayList<Object> includeTuple = new ArrayList<Object>();
    +    for (FieldInfo s : includeFieldInfo) {
    +      includeTuple.add(tuple.get(s.getColumnName()));
    +    }
    +    return includeTuple;
    +  }
    +
    +  private Object getKey(Map<String, Object> tuple)
    +  {
    +    ArrayList<Object> lst = new ArrayList<Object>();
    +    for (FieldInfo key : lookupFieldInfo) {
    +      lst.add(tuple.get(key.getColumnName()));
    +    }
    +    return lst;
    +  }
    +
    +  @Override
    +  public Object get(Object key)
    +  {
    +    return null;
    +  }
    +
    +  @Override
    +  public List<Object> getAll(List<Object> keys)
    +  {
    +    return null;
    +  }
    +
    +  @Override
    +  public void connect() throws IOException
    +  {
    +    Configuration conf = new Configuration();
    +    this.filePath = new Path(fileName);
    +    this.fs = FileSystem.newInstance(filePath.toUri(), conf);
    --- End diff --
    
    Considering the size of the file, I think it might be just more efficient to reload the file.
    This FSLoader is to be used when size of the file is small even to be loaded in memory. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60209029
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java ---
    @@ -0,0 +1,319 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.esotericsoftware.kryo.NotNull;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.lib.db.cache.CacheManager;
    +import com.datatorrent.lib.db.cache.CacheStore;
    +import com.datatorrent.lib.util.FieldInfo;
    +import com.datatorrent.lib.util.FieldInfo.SupportType;
    +
    +/**
    + * Base class for Enrichment Operator.&nbsp; Subclasses should provide implementation to getKey and convert.
    + * The operator receives a tuple and emits enriched tuple based on includeFields and lookupFields. <br/>
    + * <p>
    + * Properties:<br>
    + * <b>lookupFields</b>: List of comma separated keys for quick searching. Ex: Field1,Field2,Field3<br>
    + * <b>includeFields</b>: List of comma separated fields to be replaced/added to the input tuple. Ex: Field1,Field2,Field3<br>
    + * <b>store</b>: Specify the type of loader for looking data<br>
    + * <br>
    + *
    + * @param <INPUT>  Type of tuples which are received by this operator</T>
    + * @param <OUTPUT> Type of tuples which are emitted by this operator</T>
    + * @displayName Abstract Enrichment Operator
    + * @tags Enrichment
    + */
    +@InterfaceStability.Evolving
    +public abstract class AbstractEnricher<INPUT, OUTPUT> extends BaseOperator implements Operator.ActivationListener
    +{
    +  /**
    +   * Mandatory parameters for Enricher
    +   */
    +  @NotNull
    +  protected List<String> lookupFields;
    +  @NotNull
    +  protected List<String> includeFields;
    +  @NotNull
    +  private BackendLoader store;
    +
    +  /**
    +   * Optional parameters for enricher.
    +   */
    +  private int cacheExpirationInterval = 24 * 60 * 60 * 1000;
    +  private int cacheCleanupInterval = 24 * 60 * 60 * 1000;
    +  private int cacheSize = 1024;
    +
    +  /**
    +   * Helper variables.
    +   */
    +  private transient CacheManager cacheManager;
    +  protected transient List<FieldInfo> lookupFieldInfo = new ArrayList<>();
    +  protected transient List<FieldInfo> includeFieldInfo = new ArrayList<>();
    +
    +  /**
    +   * This method needs to be called by implementing class for processing a tuple for enrichment.
    +   * The method will take th tuple through following stages:
    +   * <ol>
    +   * <li>Call {@link #getKey(Object)} to retrieve key fields for lookup</li>
    +   * <li>Using key fields call cache manager to retrieve for any key that is cached already</li>
    +   * <li>If not found in cache, it'll do a lookup in configured backend store</li>
    +   * <li>The retrieved fields will be passed to {@link #convert(Object, Object)} method to create the final object</li>
    +   * <li>Finally {@link #emitTuple(Object)} is called for emitting the tuple</li>
    +   * </ol>
    +   *
    +   * @param tuple Input tuple that needs to get processed for enrichment.
    +   */
    +  protected void enrichTuple(INPUT tuple)
    +  {
    +    Object key = getKey(tuple);
    +    if (key != null) {
    +      Object result = cacheManager.get(key);
    +      OUTPUT out = convert(tuple, result);
    +      emitTuple(out);
    +    }
    +  }
    +
    +  /**
    +   * The method should be implemented by concrete class which returns an ArrayList<Object> containing all the fields
    +   * which forms key part of lookup.
    +   * The order of field values should be same as the one set in {@link #lookupFields} variable.
    +   *
    +   * @param tuple Input tuple from which fields values for key needs to be fetched.
    +   * @return Should return ArrayList<Object> which has fields values forming keys in same order as {@link #lookupFields}
    +   */
    +  protected abstract Object getKey(INPUT tuple);
    +
    +  /**
    +   * The method should be implemented by concrete class.
    +   * This method is expected to take input tuple and a externally fetched object contained field to be enriched, and
    --- End diff --
    
    typo: an externally, containing fields


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60867846
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/JDBCLoader.java ---
    @@ -0,0 +1,201 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.ResultSetMetaData;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.commons.collections.CollectionUtils;
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.google.common.collect.Lists;
    +import com.datatorrent.lib.db.jdbc.JdbcStore;
    +import com.datatorrent.lib.util.FieldInfo;
    +
    +/**
    + * <p>HBaseLoader extends from {@link JdbcStore} uses JDBC to connect and implements BackendLoaders interface.</p> <br/>
    + * <p>
    + * Properties:<br>
    + * <b>queryStmt</b>: Sql Prepared Statement which needs to be executed<br>
    + * <b>tableName</b>: JDBC table name<br>
    + * <br>
    + */
    +@InterfaceStability.Evolving
    +public class JDBCLoader extends JdbcStore implements BackendLoader
    +{
    +  protected String queryStmt;
    +
    +  protected String tableName;
    +
    +  protected transient List<FieldInfo> includeFieldInfo;
    +  protected transient List<FieldInfo> lookupFieldInfo;
    +
    +  protected Object getQueryResult(Object key)
    +  {
    +    try {
    +      PreparedStatement getStatement = getConnection().prepareStatement(queryStmt);
    +      ArrayList<Object> keys = (ArrayList<Object>)key;
    +      for (int i = 0; i < keys.size(); i++) {
    +        getStatement.setObject(i + 1, keys.get(i));
    +      }
    +      return getStatement.executeQuery();
    +    } catch (SQLException e) {
    +      throw new RuntimeException(e);
    +    }
    +  }
    +
    +  protected ArrayList<Object> getDataFrmResult(Object result) throws RuntimeException
    +  {
    +    try {
    +      ResultSet resultSet = (ResultSet)result;
    +      if (resultSet.next()) {
    +        ResultSetMetaData rsdata = resultSet.getMetaData();
    +        // If the includefields is empty, populate it from ResultSetMetaData
    +        if (CollectionUtils.isEmpty(includeFieldInfo)) {
    +          if (includeFieldInfo == null) {
    +            includeFieldInfo = new ArrayList<>();
    +          }
    +          for (int i = 1; i <= rsdata.getColumnCount(); i++) {
    +            String columnName = rsdata.getColumnName(i);
    +            // TODO: Take care of type conversion.
    +            includeFieldInfo.add(new FieldInfo(columnName, columnName, FieldInfo.SupportType.OBJECT));
    +          }
    +        }
    +
    +        ArrayList<Object> res = new ArrayList<Object>();
    +        for (FieldInfo f : includeFieldInfo) {
    +          res.add(getConvertedData(resultSet.getObject(f.getColumnName()), f));
    +        }
    +        return res;
    +      } else {
    +        return null;
    +      }
    +    } catch (SQLException e) {
    +      throw new RuntimeException(e);
    +    }
    +  }
    +
    +  private Object getConvertedData(Object object, FieldInfo f)
    +  {
    +    if (f.getType().getJavaType() == object.getClass()) {
    +      return object;
    +    } else {
    +      logger.warn("Type mismatch seen for field {}, returning as it is", f.getColumnName());
    +      return null;
    +    }
    +  }
    +
    +  private String generateQueryStmt()
    +  {
    +    String stmt = "select * from " + tableName + " where ";
    +    boolean first = true;
    +    for (FieldInfo fieldInfo : lookupFieldInfo) {
    +      if (first) {
    +        first = false;
    +      } else {
    +        stmt += " and ";
    +      }
    +      stmt += fieldInfo.getColumnName() + " = ?";
    +    }
    +
    +    logger.info("generateQueryStmt: {}", stmt);
    +    return stmt;
    +  }
    +
    +  public String getQueryStmt()
    +  {
    +    return queryStmt;
    +  }
    +
    +  /**
    +   * Set the sql Prepared Statement if the enrichment mechanism is query based.
    +   */
    +  public void setQueryStmt(String queryStmt)
    +  {
    +    this.queryStmt = queryStmt;
    +  }
    +
    +  public String getTableName()
    +  {
    +    return tableName;
    +  }
    +
    +  /**
    +   * Set the table name.
    +   */
    +  public void setTableName(String tableName)
    +  {
    +    this.tableName = tableName;
    +  }
    +
    +  @Override
    +  public void setFieldInfo(List<FieldInfo> lookupFieldInfo, List<FieldInfo> includeFieldInfo)
    +  {
    +    this.lookupFieldInfo = lookupFieldInfo;
    +    this.includeFieldInfo = includeFieldInfo;
    +    if (queryStmt == null) {
    +      queryStmt = generateQueryStmt();
    +    }
    +  }
    +
    +  @Override
    +  public Map<Object, Object> loadInitialData()
    +  {
    +    return null;
    --- End diff --
    
    Will it be useful to make this optionally load initial data?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60870647
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/POJOEnricher.java ---
    @@ -0,0 +1,264 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
    +import com.datatorrent.lib.util.FieldInfo;
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +
    +/**
    + * This class takes a POJO as input and extract the value of the lookupKey configured
    + * for this operator. It then does a lookup in file/DB to find matching entry and all key-value pairs
    + * specified in the file/DB or based on include fieldMap are added to original tuple.
    + * This operator is App Builder schema support enabled. <br>
    + * <p>
    + * Properties:<br>
    + * <b>inputClass</b>: Class to be loaded for the incoming data type<br>
    + * <b>outputClass</b>: Class to be loaded for the emitted data type<br>
    + * <br>
    + * <p>
    + * Example
    + * The file contains data in json format, one entry per line. during setup entire file is read and
    + * kept in memory for quick lookup.
    + * If file contains following lines, and operator is configured with lookup key "productId"
    + * { "productId": 1, "productCategory": 3 }
    + * { "productId": 4, "productCategory": 10 }
    + * { "productId": 3, "productCategory": 1 }
    + * <p>
    + * And input tuple is
    + * { amount=10.0, channelId=4, productId=3 }
    + * <p>
    + * The tuple is modified as below before operator emits it on output port.
    + * { amount=10.0, channelId=4, productId=3, productCategory=1 }
    + *
    + * @displayName BeanEnrichment
    + * @category Database
    + * @tags enrichment, pojo, schema, lookup
    + */
    +@InterfaceStability.Evolving
    +public class POJOEnricher extends AbstractEnricher<Object, Object>
    +{
    +  private static final Logger logger = LoggerFactory.getLogger(POJOEnricher.class);
    +
    +  /**
    +   * Helper fields
    +   */
    +  protected Class<?> inputClass;
    +  protected Class<?> outputClass;
    +  private transient Map<PojoUtils.Getter, PojoUtils.Setter> fieldMap = new HashMap<>();
    +  private transient List<PojoUtils.Setter> includeSetters = new ArrayList<>();
    +  private transient List<PojoUtils.Getter> lookupGetters = new ArrayList<>();
    +
    +  @InputPortFieldAnnotation(schemaRequired = true)
    +  public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
    +  {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      inputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
    +    }
    +
    +    @Override
    +    public void process(Object object)
    +    {
    +      processTuple(object);
    +    }
    +  };
    +
    +  @OutputPortFieldAnnotation(schemaRequired = true)
    +  public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>()
    +  {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
    +    }
    +  };
    +
    +  protected void processTuple(Object object)
    +  {
    +    enrichTuple(object);
    +  }
    +
    +  @Override
    +  protected Object getKey(Object tuple)
    +  {
    +    ArrayList<Object> keyList = new ArrayList<>();
    +    for (PojoUtils.Getter lookupGetter : lookupGetters) {
    +      keyList.add(lookupGetter.get(tuple));
    +    }
    +    return keyList;
    +  }
    +
    +  @Override
    +  protected Object convert(Object in, Object cached)
    +  {
    +    Object o;
    +
    +    try {
    +      o = outputClass.newInstance();
    +    } catch (InstantiationException | IllegalAccessException e) {
    +      logger.error("Failed to create new instance of output POJO", e);
    +      return null;
    --- End diff --
    
    Can we add an metric here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60870872
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/POJOEnricher.java ---
    @@ -0,0 +1,264 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
    +import com.datatorrent.lib.util.FieldInfo;
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +
    +/**
    + * This class takes a POJO as input and extract the value of the lookupKey configured
    + * for this operator. It then does a lookup in file/DB to find matching entry and all key-value pairs
    + * specified in the file/DB or based on include fieldMap are added to original tuple.
    + * This operator is App Builder schema support enabled. <br>
    + * <p>
    + * Properties:<br>
    + * <b>inputClass</b>: Class to be loaded for the incoming data type<br>
    + * <b>outputClass</b>: Class to be loaded for the emitted data type<br>
    + * <br>
    + * <p>
    + * Example
    + * The file contains data in json format, one entry per line. during setup entire file is read and
    + * kept in memory for quick lookup.
    + * If file contains following lines, and operator is configured with lookup key "productId"
    + * { "productId": 1, "productCategory": 3 }
    + * { "productId": 4, "productCategory": 10 }
    + * { "productId": 3, "productCategory": 1 }
    + * <p>
    + * And input tuple is
    + * { amount=10.0, channelId=4, productId=3 }
    + * <p>
    + * The tuple is modified as below before operator emits it on output port.
    + * { amount=10.0, channelId=4, productId=3, productCategory=1 }
    + *
    + * @displayName BeanEnrichment
    + * @category Database
    + * @tags enrichment, pojo, schema, lookup
    + */
    +@InterfaceStability.Evolving
    +public class POJOEnricher extends AbstractEnricher<Object, Object>
    +{
    +  private static final Logger logger = LoggerFactory.getLogger(POJOEnricher.class);
    +
    +  /**
    +   * Helper fields
    +   */
    +  protected Class<?> inputClass;
    +  protected Class<?> outputClass;
    +  private transient Map<PojoUtils.Getter, PojoUtils.Setter> fieldMap = new HashMap<>();
    +  private transient List<PojoUtils.Setter> includeSetters = new ArrayList<>();
    +  private transient List<PojoUtils.Getter> lookupGetters = new ArrayList<>();
    +
    +  @InputPortFieldAnnotation(schemaRequired = true)
    +  public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
    +  {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      inputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
    +    }
    +
    +    @Override
    +    public void process(Object object)
    +    {
    +      processTuple(object);
    +    }
    +  };
    +
    +  @OutputPortFieldAnnotation(schemaRequired = true)
    +  public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>()
    +  {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
    +    }
    +  };
    +
    +  protected void processTuple(Object object)
    +  {
    +    enrichTuple(object);
    +  }
    +
    +  @Override
    +  protected Object getKey(Object tuple)
    +  {
    +    ArrayList<Object> keyList = new ArrayList<>();
    +    for (PojoUtils.Getter lookupGetter : lookupGetters) {
    +      keyList.add(lookupGetter.get(tuple));
    +    }
    +    return keyList;
    +  }
    +
    +  @Override
    +  protected Object convert(Object in, Object cached)
    +  {
    +    Object o;
    +
    +    try {
    +      o = outputClass.newInstance();
    +    } catch (InstantiationException | IllegalAccessException e) {
    +      logger.error("Failed to create new instance of output POJO", e);
    +      return null;
    +    }
    +
    +    for (Map.Entry<PojoUtils.Getter, PojoUtils.Setter> entry : fieldMap.entrySet()) {
    +      entry.getValue().set(o, entry.getKey().get(in));
    +    }
    +
    +    if (cached == null) {
    +      return null;
    --- End diff --
    
    + metric for enriched and not-enriched.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60207807
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java ---
    @@ -0,0 +1,319 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.esotericsoftware.kryo.NotNull;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.lib.db.cache.CacheManager;
    +import com.datatorrent.lib.db.cache.CacheStore;
    +import com.datatorrent.lib.util.FieldInfo;
    +import com.datatorrent.lib.util.FieldInfo.SupportType;
    +
    +/**
    + * Base class for Enrichment Operator.&nbsp; Subclasses should provide implementation to getKey and convert.
    + * The operator receives a tuple and emits enriched tuple based on includeFields and lookupFields. <br/>
    + * <p>
    + * Properties:<br>
    + * <b>lookupFields</b>: List of comma separated keys for quick searching. Ex: Field1,Field2,Field3<br>
    + * <b>includeFields</b>: List of comma separated fields to be replaced/added to the input tuple. Ex: Field1,Field2,Field3<br>
    + * <b>store</b>: Specify the type of loader for looking data<br>
    + * <br>
    + *
    + * @param <INPUT>  Type of tuples which are received by this operator</T>
    + * @param <OUTPUT> Type of tuples which are emitted by this operator</T>
    + * @displayName Abstract Enrichment Operator
    + * @tags Enrichment
    + */
    +@InterfaceStability.Evolving
    +public abstract class AbstractEnricher<INPUT, OUTPUT> extends BaseOperator implements Operator.ActivationListener
    +{
    +  /**
    +   * Mandatory parameters for Enricher
    +   */
    +  @NotNull
    +  protected List<String> lookupFields;
    +  @NotNull
    +  protected List<String> includeFields;
    +  @NotNull
    +  private BackendLoader store;
    +
    +  /**
    +   * Optional parameters for enricher.
    +   */
    +  private int cacheExpirationInterval = 24 * 60 * 60 * 1000;
    --- End diff --
    
    Reduce to 1hr or less?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60871105
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/POJOEnricher.java ---
    @@ -0,0 +1,264 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
    +import com.datatorrent.lib.util.FieldInfo;
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +
    +/**
    + * This class takes a POJO as input and extract the value of the lookupKey configured
    + * for this operator. It then does a lookup in file/DB to find matching entry and all key-value pairs
    + * specified in the file/DB or based on include fieldMap are added to original tuple.
    + * This operator is App Builder schema support enabled. <br>
    + * <p>
    + * Properties:<br>
    + * <b>inputClass</b>: Class to be loaded for the incoming data type<br>
    + * <b>outputClass</b>: Class to be loaded for the emitted data type<br>
    + * <br>
    + * <p>
    + * Example
    + * The file contains data in json format, one entry per line. during setup entire file is read and
    + * kept in memory for quick lookup.
    + * If file contains following lines, and operator is configured with lookup key "productId"
    + * { "productId": 1, "productCategory": 3 }
    + * { "productId": 4, "productCategory": 10 }
    + * { "productId": 3, "productCategory": 1 }
    + * <p>
    + * And input tuple is
    + * { amount=10.0, channelId=4, productId=3 }
    + * <p>
    + * The tuple is modified as below before operator emits it on output port.
    + * { amount=10.0, channelId=4, productId=3, productCategory=1 }
    + *
    + * @displayName BeanEnrichment
    + * @category Database
    + * @tags enrichment, pojo, schema, lookup
    + */
    +@InterfaceStability.Evolving
    +public class POJOEnricher extends AbstractEnricher<Object, Object>
    +{
    +  private static final Logger logger = LoggerFactory.getLogger(POJOEnricher.class);
    +
    +  /**
    +   * Helper fields
    +   */
    +  protected Class<?> inputClass;
    +  protected Class<?> outputClass;
    +  private transient Map<PojoUtils.Getter, PojoUtils.Setter> fieldMap = new HashMap<>();
    +  private transient List<PojoUtils.Setter> includeSetters = new ArrayList<>();
    +  private transient List<PojoUtils.Getter> lookupGetters = new ArrayList<>();
    +
    +  @InputPortFieldAnnotation(schemaRequired = true)
    +  public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
    +  {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      inputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
    +    }
    +
    +    @Override
    +    public void process(Object object)
    +    {
    +      processTuple(object);
    +    }
    +  };
    +
    +  @OutputPortFieldAnnotation(schemaRequired = true)
    +  public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>()
    +  {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
    +    }
    +  };
    +
    +  protected void processTuple(Object object)
    +  {
    +    enrichTuple(object);
    +  }
    +
    +  @Override
    +  protected Object getKey(Object tuple)
    +  {
    +    ArrayList<Object> keyList = new ArrayList<>();
    +    for (PojoUtils.Getter lookupGetter : lookupGetters) {
    +      keyList.add(lookupGetter.get(tuple));
    +    }
    +    return keyList;
    +  }
    +
    +  @Override
    +  protected Object convert(Object in, Object cached)
    +  {
    +    Object o;
    +
    +    try {
    +      o = outputClass.newInstance();
    +    } catch (InstantiationException | IllegalAccessException e) {
    +      logger.error("Failed to create new instance of output POJO", e);
    +      return null;
    +    }
    +
    +    for (Map.Entry<PojoUtils.Getter, PojoUtils.Setter> entry : fieldMap.entrySet()) {
    +      entry.getValue().set(o, entry.getKey().get(in));
    +    }
    +
    +    if (cached == null) {
    +      return null;
    +    }
    +
    +    ArrayList<Object> includeObjects = (ArrayList<Object>)cached;
    +    int idx = 0;
    +    for (PojoUtils.Setter includeSetter : includeSetters) {
    +      try {
    +        includeSetter.set(o, includeObjects.get(idx++));
    +      } catch (RuntimeException e) {
    +        logger.error("Failed to set the property. Continuing with default.", e);
    +      }
    --- End diff --
    
    Use for loop with idx;
    If error, redirect to error port and metric++


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60868927
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/MapEnricher.java ---
    @@ -0,0 +1,150 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.lib.util.FieldInfo;
    +
    +/**
    + * This class takes a HashMap tuple as input and extract the value of the lookupKey configured
    + * for this operator. It then does a lookup in file/DB to find matching entry and all key-value pairs
    + * specified in the file/DB or based on include fields are added to original tuple.
    + * <p>
    + * Example
    + * The file contains data in json format, one entry per line. during setup entire file is read and
    + * kept in memory for quick lookup.
    + * If file contains following lines, and operator is configured with lookup key "productId"
    + * <p>
    + * { "productId": 1, "productCategory": 3 }
    + * { "productId": 4, "productCategory": 10 }
    + * { "productId": 3, "productCategory": 1 }
    + * </p>
    + * And input tuple is
    + * <p>
    + * { amount=10.0, channelId=4, productId=3 }
    + * </p>
    + * The tuple is modified as below before operator emits it on output port.
    + * <p>
    + * { amount=10.0, channelId=4, productId=3, productCategory=1 }
    + * </p>
    + *
    + * @displayName MapEnrichment
    + * @category Database
    + * @tags enrichment, lookup, map
    + */
    +@InterfaceStability.Evolving
    +public class MapEnricher extends AbstractEnricher<Map<String, Object>, Map<String, Object>>
    +{
    +  public final transient DefaultInputPort<Map<String, Object>> input = new DefaultInputPort<Map<String, Object>>()
    +  {
    +    @Override
    +    public void process(Map<String, Object> obj)
    +    {
    +      processTuple(obj);
    +    }
    +  };
    +
    +  public final transient DefaultOutputPort<Map<String, Object>> output = new DefaultOutputPort<>();
    +
    +  private void processTuple(Map<String, Object> obj)
    --- End diff --
    
    Protected?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60871839
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/POJOEnricher.java ---
    @@ -0,0 +1,264 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
    +import com.datatorrent.lib.util.FieldInfo;
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +
    +/**
    + * This class takes a POJO as input and extract the value of the lookupKey configured
    + * for this operator. It then does a lookup in file/DB to find matching entry and all key-value pairs
    + * specified in the file/DB or based on include fieldMap are added to original tuple.
    + * This operator is App Builder schema support enabled. <br>
    + * <p>
    + * Properties:<br>
    + * <b>inputClass</b>: Class to be loaded for the incoming data type<br>
    + * <b>outputClass</b>: Class to be loaded for the emitted data type<br>
    + * <br>
    + * <p>
    + * Example
    + * The file contains data in json format, one entry per line. during setup entire file is read and
    + * kept in memory for quick lookup.
    + * If file contains following lines, and operator is configured with lookup key "productId"
    + * { "productId": 1, "productCategory": 3 }
    + * { "productId": 4, "productCategory": 10 }
    + * { "productId": 3, "productCategory": 1 }
    + * <p>
    + * And input tuple is
    + * { amount=10.0, channelId=4, productId=3 }
    + * <p>
    + * The tuple is modified as below before operator emits it on output port.
    + * { amount=10.0, channelId=4, productId=3, productCategory=1 }
    + *
    + * @displayName BeanEnrichment
    + * @category Database
    + * @tags enrichment, pojo, schema, lookup
    + */
    +@InterfaceStability.Evolving
    +public class POJOEnricher extends AbstractEnricher<Object, Object>
    +{
    +  private static final Logger logger = LoggerFactory.getLogger(POJOEnricher.class);
    +
    +  /**
    +   * Helper fields
    +   */
    +  protected Class<?> inputClass;
    +  protected Class<?> outputClass;
    +  private transient Map<PojoUtils.Getter, PojoUtils.Setter> fieldMap = new HashMap<>();
    +  private transient List<PojoUtils.Setter> includeSetters = new ArrayList<>();
    +  private transient List<PojoUtils.Getter> lookupGetters = new ArrayList<>();
    +
    +  @InputPortFieldAnnotation(schemaRequired = true)
    +  public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
    +  {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      inputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
    +    }
    +
    +    @Override
    +    public void process(Object object)
    +    {
    +      processTuple(object);
    +    }
    +  };
    +
    +  @OutputPortFieldAnnotation(schemaRequired = true)
    +  public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>()
    +  {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
    +    }
    +  };
    +
    +  protected void processTuple(Object object)
    +  {
    +    enrichTuple(object);
    +  }
    +
    +  @Override
    +  protected Object getKey(Object tuple)
    +  {
    +    ArrayList<Object> keyList = new ArrayList<>();
    +    for (PojoUtils.Getter lookupGetter : lookupGetters) {
    +      keyList.add(lookupGetter.get(tuple));
    +    }
    +    return keyList;
    +  }
    +
    +  @Override
    +  protected Object convert(Object in, Object cached)
    +  {
    +    Object o;
    +
    +    try {
    +      o = outputClass.newInstance();
    +    } catch (InstantiationException | IllegalAccessException e) {
    +      logger.error("Failed to create new instance of output POJO", e);
    +      return null;
    +    }
    +
    +    for (Map.Entry<PojoUtils.Getter, PojoUtils.Setter> entry : fieldMap.entrySet()) {
    +      entry.getValue().set(o, entry.getKey().get(in));
    +    }
    +
    +    if (cached == null) {
    +      return null;
    +    }
    +
    +    ArrayList<Object> includeObjects = (ArrayList<Object>)cached;
    +    int idx = 0;
    +    for (PojoUtils.Setter includeSetter : includeSetters) {
    +      try {
    +        includeSetter.set(o, includeObjects.get(idx++));
    +      } catch (RuntimeException e) {
    +        logger.error("Failed to set the property. Continuing with default.", e);
    +      }
    +    }
    +
    +    return o;
    +  }
    +
    +  @Override
    +  protected void emitTuple(Object tuple)
    +  {
    +    output.emit(tuple);
    +  }
    +
    +  @Override
    +  protected Class<?> getIncludeFieldType(String fieldName)
    +  {
    +    try {
    +      return outputClass.getDeclaredField(fieldName).getType();
    +    } catch (NoSuchFieldException e) {
    +      logger.warn("Failed to find given fieldName, returning object type", e);
    +      return Object.class;
    +    }
    +  }
    +
    +  @Override
    +  protected Class<?> getLookupFieldType(String fieldName)
    +  {
    +    try {
    +      return inputClass.getDeclaredField(fieldName).getType();
    +    } catch (NoSuchFieldException e) {
    +      logger.warn("Failed to find given fieldName, returning object type", e);
    +      return Object.class;
    +    }
    +  }
    +
    +  @SuppressWarnings({ "unchecked", "rawtypes" })
    +  private PojoUtils.Setter generateSettersForField(Class<?> klass, String outputFieldName)
    +      throws NoSuchFieldException, SecurityException
    +  {
    +    Field f = outputClass.getDeclaredField(outputFieldName);
    +    Class c = ClassUtils.primitiveToWrapper(f.getType());
    +    PojoUtils.Setter classSetter = PojoUtils.createSetter(klass, outputFieldName, c);
    +    return classSetter;
    +  }
    +
    +  @SuppressWarnings({ "unchecked", "rawtypes" })
    +  private PojoUtils.Getter generateGettersForField(Class<?> klass, String inputFieldName)
    +      throws NoSuchFieldException, SecurityException
    +  {
    +    Field f = klass.getDeclaredField(inputFieldName);
    +    Class c = ClassUtils.primitiveToWrapper(f.getType());
    +    PojoUtils.Getter classGetter = PojoUtils.createGetter(klass, inputFieldName, c);
    +    return classGetter;
    +  }
    +
    +  @Override
    +  public void activate(Context context)
    +  {
    +    super.activate(context);
    +
    +    for (Field field : inputClass.getFields()) {
    +      try {
    +        fieldMap.put(generateGettersForField(inputClass, field.getName()),
    +            generateSettersForField(outputClass, field.getName()));
    +      } catch (NoSuchFieldException e) {
    +        logger.warn("Unable to find field with name {}, ignoring that field. Exception: {}", field.getName(), e);
    --- End diff --
    
    Throw runtime exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60208201
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java ---
    @@ -0,0 +1,319 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.esotericsoftware.kryo.NotNull;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.lib.db.cache.CacheManager;
    +import com.datatorrent.lib.db.cache.CacheStore;
    +import com.datatorrent.lib.util.FieldInfo;
    +import com.datatorrent.lib.util.FieldInfo.SupportType;
    +
    +/**
    + * Base class for Enrichment Operator.&nbsp; Subclasses should provide implementation to getKey and convert.
    + * The operator receives a tuple and emits enriched tuple based on includeFields and lookupFields. <br/>
    + * <p>
    + * Properties:<br>
    + * <b>lookupFields</b>: List of comma separated keys for quick searching. Ex: Field1,Field2,Field3<br>
    + * <b>includeFields</b>: List of comma separated fields to be replaced/added to the input tuple. Ex: Field1,Field2,Field3<br>
    + * <b>store</b>: Specify the type of loader for looking data<br>
    + * <br>
    + *
    + * @param <INPUT>  Type of tuples which are received by this operator</T>
    + * @param <OUTPUT> Type of tuples which are emitted by this operator</T>
    + * @displayName Abstract Enrichment Operator
    + * @tags Enrichment
    + */
    +@InterfaceStability.Evolving
    +public abstract class AbstractEnricher<INPUT, OUTPUT> extends BaseOperator implements Operator.ActivationListener
    +{
    +  /**
    +   * Mandatory parameters for Enricher
    +   */
    +  @NotNull
    +  protected List<String> lookupFields;
    +  @NotNull
    +  protected List<String> includeFields;
    +  @NotNull
    +  private BackendLoader store;
    +
    +  /**
    +   * Optional parameters for enricher.
    +   */
    +  private int cacheExpirationInterval = 24 * 60 * 60 * 1000;
    +  private int cacheCleanupInterval = 24 * 60 * 60 * 1000;
    +  private int cacheSize = 1024;
    +
    +  /**
    +   * Helper variables.
    +   */
    +  private transient CacheManager cacheManager;
    +  protected transient List<FieldInfo> lookupFieldInfo = new ArrayList<>();
    +  protected transient List<FieldInfo> includeFieldInfo = new ArrayList<>();
    +
    +  /**
    +   * This method needs to be called by implementing class for processing a tuple for enrichment.
    +   * The method will take th tuple through following stages:
    +   * <ol>
    +   * <li>Call {@link #getKey(Object)} to retrieve key fields for lookup</li>
    +   * <li>Using key fields call cache manager to retrieve for any key that is cached already</li>
    +   * <li>If not found in cache, it'll do a lookup in configured backend store</li>
    +   * <li>The retrieved fields will be passed to {@link #convert(Object, Object)} method to create the final object</li>
    +   * <li>Finally {@link #emitTuple(Object)} is called for emitting the tuple</li>
    +   * </ol>
    +   *
    --- End diff --
    
    What is the lookup fails, will the enricher emit tuple on error port?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60873281
  
    --- Diff: contrib/src/test/java/com/datatorrent/contrib/enrich/MapEnricherTest.java ---
    @@ -0,0 +1,271 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.commons.collections.CollectionUtils;
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.collect.Maps;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.lib.io.ConsoleOutputOperator;
    +import com.datatorrent.lib.testbench.CollectorTestSink;
    +import com.datatorrent.lib.util.FieldInfo;
    +import com.datatorrent.lib.util.TestUtils;
    +
    +public class MapEnricherTest
    +{
    +  @Test
    +  public void includeAllKeys()
    +  {
    +    MapEnricher oper = new MapEnricher();
    +    oper.setStore(new MemoryStore());
    +    oper.setLookupFields(Arrays.asList("In1"));
    +    oper.setup(null);
    +
    +    CollectorTestSink sink = new CollectorTestSink();
    +    TestUtils.setSink(oper.output, sink);
    +
    +    Map<String, Object> inMap = Maps.newHashMap();
    +    inMap.put("In1", "Value1");
    +    inMap.put("In2", "Value2");
    +
    +    oper.activate(null);
    +    oper.beginWindow(1);
    +    oper.input.process(inMap);
    +    oper.endWindow();
    +    oper.deactivate();
    +
    +    Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size());
    +    Assert.assertEquals("Enrich Tuple: ", "{A=Val_A, B=Val_B, C=Val_C, In2=Value2, In1=Value3}",
    +        sink.collectedTuples.get(0).toString());
    +  }
    +
    +  @Test
    +  public void includeSelectedKeys()
    +  {
    +    MapEnricher oper = new MapEnricher();
    +    oper.setStore(new MemoryStore());
    +    oper.setLookupFields(Arrays.asList("In1"));
    +    oper.setIncludeFields(Arrays.asList("A", "B"));
    +    oper.setup(null);
    +
    +    CollectorTestSink sink = new CollectorTestSink();
    +    TestUtils.setSink(oper.output, sink);
    +
    +    Map<String, Object> inMap = Maps.newHashMap();
    +    inMap.put("In1", "Value1");
    +    inMap.put("In2", "Value2");
    +
    +    oper.activate(null);
    +    oper.beginWindow(1);
    +    oper.input.process(inMap);
    +    oper.endWindow();
    +    oper.deactivate();
    +
    +    Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size());
    +    Assert.assertEquals("Enrich Tuple: ", "{A=Val_A, B=Val_B, In2=Value2, In1=Value1}",
    +        sink.collectedTuples.get(0).toString());
    +  }
    +
    +  @Test
    +  public void testApplication() throws Exception
    +  {
    +    LocalMode lma = LocalMode.newInstance();
    +    Configuration conf = new Configuration(false);
    +    lma.prepareDAG(new EnrichApplication(), conf);
    +    LocalMode.Controller lc = lma.getController();
    +    lc.run(10000);// runs for 10 seconds and quits
    +  }
    +
    +  public static class EnrichApplication implements StreamingApplication
    +  {
    +    @Override
    +    public void populateDAG(DAG dag, Configuration configuration)
    +    {
    +      RandomMapGenerator input = dag.addOperator("Input", RandomMapGenerator.class);
    +      MapEnricher enrich = dag.addOperator("Enrich", MapEnricher.class);
    +      ConsoleOutputOperator console = dag.addOperator("Console", ConsoleOutputOperator.class);
    +      console.setSilent(true);
    +
    +      List<String> includeFields = new ArrayList<>();
    +      includeFields.add("A");
    +      includeFields.add("B");
    +      List<String> lookupFields = new ArrayList<>();
    +      lookupFields.add("In1");
    +
    +      enrich.setStore(new MemoryStore());
    +      enrich.setIncludeFields(includeFields);
    +      enrich.setLookupFields(lookupFields);
    +
    +      dag.addStream("S1", input.output, enrich.input);
    +      dag.addStream("S2", enrich.output, console.input);
    +    }
    +  }
    +
    +  public static class RandomMapGenerator implements InputOperator
    --- End diff --
    
    extends BaseOperator and implements InputOperator?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60206200
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java ---
    @@ -0,0 +1,319 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.esotericsoftware.kryo.NotNull;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.lib.db.cache.CacheManager;
    +import com.datatorrent.lib.db.cache.CacheStore;
    +import com.datatorrent.lib.util.FieldInfo;
    +import com.datatorrent.lib.util.FieldInfo.SupportType;
    +
    +/**
    + * Base class for Enrichment Operator.&nbsp; Subclasses should provide implementation to getKey and convert.
    + * The operator receives a tuple and emits enriched tuple based on includeFields and lookupFields. <br/>
    + * <p>
    + * Properties:<br>
    + * <b>lookupFields</b>: List of comma separated keys for quick searching. Ex: Field1,Field2,Field3<br>
    + * <b>includeFields</b>: List of comma separated fields to be replaced/added to the input tuple. Ex: Field1,Field2,Field3<br>
    + * <b>store</b>: Specify the type of loader for looking data<br>
    + * <br>
    + *
    + * @param <INPUT>  Type of tuples which are received by this operator</T>
    + * @param <OUTPUT> Type of tuples which are emitted by this operator</T>
    + * @displayName Abstract Enrichment Operator
    + * @tags Enrichment
    + */
    +@InterfaceStability.Evolving
    +public abstract class AbstractEnricher<INPUT, OUTPUT> extends BaseOperator implements Operator.ActivationListener
    +{
    +  /**
    +   * Mandatory parameters for Enricher
    +   */
    +  @NotNull
    +  protected List<String> lookupFields;
    +  @NotNull
    +  protected List<String> includeFields;
    +  @NotNull
    +  private BackendLoader store;
    +
    +  /**
    +   * Optional parameters for enricher.
    +   */
    +  private int cacheExpirationInterval = 24 * 60 * 60 * 1000;
    +  private int cacheCleanupInterval = 24 * 60 * 60 * 1000;
    +  private int cacheSize = 1024;
    +
    +  /**
    +   * Helper variables.
    +   */
    +  private transient CacheManager cacheManager;
    +  protected transient List<FieldInfo> lookupFieldInfo = new ArrayList<>();
    +  protected transient List<FieldInfo> includeFieldInfo = new ArrayList<>();
    +
    +  /**
    +   * This method needs to be called by implementing class for processing a tuple for enrichment.
    +   * The method will take th tuple through following stages:
    --- End diff --
    
    typo.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60868871
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/MapEnricher.java ---
    @@ -0,0 +1,150 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.lib.util.FieldInfo;
    +
    +/**
    + * This class takes a HashMap tuple as input and extract the value of the lookupKey configured
    + * for this operator. It then does a lookup in file/DB to find matching entry and all key-value pairs
    + * specified in the file/DB or based on include fields are added to original tuple.
    + * <p>
    + * Example
    + * The file contains data in json format, one entry per line. during setup entire file is read and
    + * kept in memory for quick lookup.
    + * If file contains following lines, and operator is configured with lookup key "productId"
    + * <p>
    --- End diff --
    
    Please update the example.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60870750
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/POJOEnricher.java ---
    @@ -0,0 +1,264 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
    +import com.datatorrent.lib.util.FieldInfo;
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +
    +/**
    + * This class takes a POJO as input and extract the value of the lookupKey configured
    + * for this operator. It then does a lookup in file/DB to find matching entry and all key-value pairs
    + * specified in the file/DB or based on include fieldMap are added to original tuple.
    + * This operator is App Builder schema support enabled. <br>
    + * <p>
    + * Properties:<br>
    + * <b>inputClass</b>: Class to be loaded for the incoming data type<br>
    + * <b>outputClass</b>: Class to be loaded for the emitted data type<br>
    + * <br>
    + * <p>
    + * Example
    + * The file contains data in json format, one entry per line. during setup entire file is read and
    + * kept in memory for quick lookup.
    + * If file contains following lines, and operator is configured with lookup key "productId"
    + * { "productId": 1, "productCategory": 3 }
    + * { "productId": 4, "productCategory": 10 }
    + * { "productId": 3, "productCategory": 1 }
    + * <p>
    + * And input tuple is
    + * { amount=10.0, channelId=4, productId=3 }
    + * <p>
    + * The tuple is modified as below before operator emits it on output port.
    + * { amount=10.0, channelId=4, productId=3, productCategory=1 }
    + *
    + * @displayName BeanEnrichment
    + * @category Database
    + * @tags enrichment, pojo, schema, lookup
    + */
    +@InterfaceStability.Evolving
    +public class POJOEnricher extends AbstractEnricher<Object, Object>
    +{
    +  private static final Logger logger = LoggerFactory.getLogger(POJOEnricher.class);
    +
    +  /**
    +   * Helper fields
    +   */
    +  protected Class<?> inputClass;
    +  protected Class<?> outputClass;
    +  private transient Map<PojoUtils.Getter, PojoUtils.Setter> fieldMap = new HashMap<>();
    +  private transient List<PojoUtils.Setter> includeSetters = new ArrayList<>();
    +  private transient List<PojoUtils.Getter> lookupGetters = new ArrayList<>();
    +
    +  @InputPortFieldAnnotation(schemaRequired = true)
    +  public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
    +  {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      inputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
    +    }
    +
    +    @Override
    +    public void process(Object object)
    +    {
    +      processTuple(object);
    +    }
    +  };
    +
    +  @OutputPortFieldAnnotation(schemaRequired = true)
    +  public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>()
    +  {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
    +    }
    +  };
    +
    +  protected void processTuple(Object object)
    +  {
    +    enrichTuple(object);
    +  }
    +
    +  @Override
    +  protected Object getKey(Object tuple)
    +  {
    +    ArrayList<Object> keyList = new ArrayList<>();
    +    for (PojoUtils.Getter lookupGetter : lookupGetters) {
    +      keyList.add(lookupGetter.get(tuple));
    +    }
    +    return keyList;
    +  }
    +
    +  @Override
    +  protected Object convert(Object in, Object cached)
    +  {
    +    Object o;
    +
    +    try {
    +      o = outputClass.newInstance();
    +    } catch (InstantiationException | IllegalAccessException e) {
    +      logger.error("Failed to create new instance of output POJO", e);
    +      return null;
    +    }
    +
    +    for (Map.Entry<PojoUtils.Getter, PojoUtils.Setter> entry : fieldMap.entrySet()) {
    +      entry.getValue().set(o, entry.getKey().get(in));
    +    }
    +
    +    if (cached == null) {
    +      return null;
    --- End diff --
    
    return o? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60873987
  
    --- Diff: contrib/src/test/java/com/datatorrent/contrib/enrich/POJOEnricherTest.java ---
    @@ -0,0 +1,223 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.lib.testbench.CollectorTestSink;
    +import com.datatorrent.lib.util.TestUtils;
    +
    +public class POJOEnricherTest extends JDBCLoaderTest
    +{
    +  @Test
    +  public void includeSelectedKeys()
    +  {
    +    POJOEnricher oper = new POJOEnricher();
    +    oper.setStore(testMeta.dbloader);
    +    oper.setLookupFields(Arrays.asList("ID"));
    +    oper.setIncludeFields(Arrays.asList("NAME", "AGE", "ADDRESS"));
    +    oper.outputClass = EmployeeOrder.class;
    +    oper.inputClass = Order.class;
    +    oper.setup(null);
    +
    +    CollectorTestSink sink = new CollectorTestSink();
    +    TestUtils.setSink(oper.output, sink);
    +
    +    oper.activate(null);
    +
    +    oper.beginWindow(1);
    +    Order tuple = new Order(3, 4, 700);
    +    oper.input.process(tuple);
    +    oper.endWindow();
    +
    +    oper.deactivate();
    +
    +    Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size());
    +    Assert.assertEquals("Ouput Tuple: ",
    +        "{OID=3, ID=4, amount=700.0, NAME='Mark', AGE=25, ADDRESS='Rich-Mond', SALARY=0.0}",
    +        sink.collectedTuples.get(0).toString());
    +  }
    +
    +  @Test
    +  public void includeAllKeys()
    +  {
    +    POJOEnricher oper = new POJOEnricher();
    +    oper.setStore(testMeta.dbloader);
    +    oper.setLookupFields(Arrays.asList("ID"));
    +    oper.setIncludeFields(Arrays.asList("NAME", "AGE", "ADDRESS", "SALARY"));
    +    oper.outputClass = EmployeeOrder.class;
    +    oper.inputClass = Order.class;
    +    oper.setup(null);
    +
    +    CollectorTestSink sink = new CollectorTestSink();
    +    TestUtils.setSink(oper.output, sink);
    +
    +    oper.activate(null);
    +
    +    oper.beginWindow(1);
    +    Order tuple = new Order(3, 4, 700);
    +    oper.input.process(tuple);
    +    oper.endWindow();
    +
    +    oper.deactivate();
    +
    +    Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size());
    +    Assert.assertEquals("Ouput Tuple: ",
    +        "{OID=3, ID=4, amount=700.0, NAME='Mark', AGE=25, ADDRESS='Rich-Mond', SALARY=65000.0}",
    +        sink.collectedTuples.get(0).toString());
    +  }
    +
    +  @Test
    +  public void testApplication() throws Exception
    +  {
    +    EnrichApplication enrichApplication = new EnrichApplication();
    +    enrichApplication.setLoader(testMeta.dbloader);
    +
    +    LocalMode lma = LocalMode.newInstance();
    +    Configuration conf = new Configuration(false);
    +    lma.prepareDAG(enrichApplication, conf);
    +    LocalMode.Controller lc = lma.getController();
    +    lc.run(10000);// runs for 10 seconds and quits
    +  }
    +
    +  public static class EnrichApplication implements StreamingApplication
    +  {
    +    BackendLoader loader;
    +
    +    @Override
    +    public void populateDAG(DAG dag, Configuration configuration)
    +    {
    +      RandomPOJOGenerator input = dag.addOperator("Input", RandomPOJOGenerator.class);
    +      POJOEnricher enrich = dag.addOperator("Enrich", POJOEnricher.class);
    +      EnrichVerifier verify = dag.addOperator("Verify", EnrichVerifier.class);
    +
    +      enrich.setStore(loader);
    +      ArrayList<String> lookupFields = new ArrayList<>();
    +      lookupFields.add("ID");
    +      ArrayList<String> includeFields = new ArrayList<>();
    +      includeFields.add("NAME");
    +      includeFields.add("AGE");
    +      includeFields.add("ADDRESS");
    +      includeFields.add("SALARY");
    +      enrich.setLookupFields(lookupFields);
    +      enrich.setIncludeFields(includeFields);
    +
    +      dag.getMeta(enrich).getMeta(enrich.input).getAttributes().put(Context.PortContext.TUPLE_CLASS, Order.class);
    +      dag.getMeta(enrich).getMeta(enrich.output).getAttributes()
    +          .put(Context.PortContext.TUPLE_CLASS, EmployeeOrder.class);
    +
    +      dag.addStream("S1", input.output, enrich.input);
    +      dag.addStream("S2", enrich.output, verify.input);
    +    }
    +
    +    public void setLoader(BackendLoader loader)
    +    {
    +      this.loader = loader;
    +    }
    +  }
    +
    +  public static class RandomPOJOGenerator implements InputOperator
    +  {
    +    public transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
    +    private int idx = 0;
    +    private boolean emit = true;
    +
    +    @Override
    +    public void emitTuples()
    +    {
    +      if (!emit) {
    +        return;
    +      }
    +      idx += idx++ % 4;
    +      Order o = new Order(1, idx + 1, 100.00);
    +      output.emit(o);
    +      if (idx == 3) {
    +        emit = false;
    +      }
    +    }
    +
    +    @Override
    +    public void beginWindow(long l)
    +    {
    +      emit = true;
    +    }
    +
    +    @Override
    +    public void endWindow()
    +    {
    +
    +    }
    +
    +    @Override
    +    public void setup(Context.OperatorContext context)
    +    {
    +    }
    +
    +    @Override
    +    public void teardown()
    +    {
    +    }
    +  }
    +
    +  public static class EnrichVerifier extends BaseOperator
    +  {
    +    private static final Logger logger = LoggerFactory.getLogger(EnrichVerifier.class);
    +
    +    private transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
    +    {
    +      @Override
    +      public void process(Object o)
    +      {
    +        Assert.assertTrue(o instanceof EmployeeOrder);
    +        EmployeeOrder order = (EmployeeOrder)o;
    +        int id = order.getID();
    +        Assert.assertTrue(id >= 1 && id <= 4);
    +        Assert.assertEquals(1, order.getOID());
    +        Assert.assertEquals(100.00, order.getAmount(), 0);
    +
    +        String[] names = {"Paul", "Allen", "Teddy", "Mark"};
    --- End diff --
    
    Use a variable that can be shared at both the places. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60871574
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/POJOEnricher.java ---
    @@ -0,0 +1,264 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
    +import com.datatorrent.lib.util.FieldInfo;
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +
    +/**
    + * This class takes a POJO as input and extract the value of the lookupKey configured
    + * for this operator. It then does a lookup in file/DB to find matching entry and all key-value pairs
    + * specified in the file/DB or based on include fieldMap are added to original tuple.
    + * This operator is App Builder schema support enabled. <br>
    + * <p>
    + * Properties:<br>
    + * <b>inputClass</b>: Class to be loaded for the incoming data type<br>
    + * <b>outputClass</b>: Class to be loaded for the emitted data type<br>
    + * <br>
    + * <p>
    + * Example
    + * The file contains data in json format, one entry per line. during setup entire file is read and
    + * kept in memory for quick lookup.
    + * If file contains following lines, and operator is configured with lookup key "productId"
    + * { "productId": 1, "productCategory": 3 }
    + * { "productId": 4, "productCategory": 10 }
    + * { "productId": 3, "productCategory": 1 }
    + * <p>
    + * And input tuple is
    + * { amount=10.0, channelId=4, productId=3 }
    + * <p>
    + * The tuple is modified as below before operator emits it on output port.
    + * { amount=10.0, channelId=4, productId=3, productCategory=1 }
    + *
    + * @displayName BeanEnrichment
    + * @category Database
    + * @tags enrichment, pojo, schema, lookup
    + */
    +@InterfaceStability.Evolving
    +public class POJOEnricher extends AbstractEnricher<Object, Object>
    +{
    +  private static final Logger logger = LoggerFactory.getLogger(POJOEnricher.class);
    +
    +  /**
    +   * Helper fields
    +   */
    +  protected Class<?> inputClass;
    +  protected Class<?> outputClass;
    +  private transient Map<PojoUtils.Getter, PojoUtils.Setter> fieldMap = new HashMap<>();
    +  private transient List<PojoUtils.Setter> includeSetters = new ArrayList<>();
    +  private transient List<PojoUtils.Getter> lookupGetters = new ArrayList<>();
    +
    +  @InputPortFieldAnnotation(schemaRequired = true)
    +  public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
    +  {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      inputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
    +    }
    +
    +    @Override
    +    public void process(Object object)
    +    {
    +      processTuple(object);
    +    }
    +  };
    +
    +  @OutputPortFieldAnnotation(schemaRequired = true)
    +  public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>()
    +  {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
    +    }
    +  };
    +
    +  protected void processTuple(Object object)
    +  {
    +    enrichTuple(object);
    +  }
    +
    +  @Override
    +  protected Object getKey(Object tuple)
    +  {
    +    ArrayList<Object> keyList = new ArrayList<>();
    +    for (PojoUtils.Getter lookupGetter : lookupGetters) {
    +      keyList.add(lookupGetter.get(tuple));
    +    }
    +    return keyList;
    +  }
    +
    +  @Override
    +  protected Object convert(Object in, Object cached)
    +  {
    +    Object o;
    +
    +    try {
    +      o = outputClass.newInstance();
    +    } catch (InstantiationException | IllegalAccessException e) {
    +      logger.error("Failed to create new instance of output POJO", e);
    +      return null;
    +    }
    +
    +    for (Map.Entry<PojoUtils.Getter, PojoUtils.Setter> entry : fieldMap.entrySet()) {
    +      entry.getValue().set(o, entry.getKey().get(in));
    +    }
    +
    +    if (cached == null) {
    +      return null;
    +    }
    +
    +    ArrayList<Object> includeObjects = (ArrayList<Object>)cached;
    +    int idx = 0;
    +    for (PojoUtils.Setter includeSetter : includeSetters) {
    +      try {
    +        includeSetter.set(o, includeObjects.get(idx++));
    +      } catch (RuntimeException e) {
    +        logger.error("Failed to set the property. Continuing with default.", e);
    +      }
    +    }
    +
    +    return o;
    +  }
    +
    +  @Override
    +  protected void emitTuple(Object tuple)
    +  {
    +    output.emit(tuple);
    +  }
    +
    +  @Override
    +  protected Class<?> getIncludeFieldType(String fieldName)
    +  {
    +    try {
    +      return outputClass.getDeclaredField(fieldName).getType();
    +    } catch (NoSuchFieldException e) {
    +      logger.warn("Failed to find given fieldName, returning object type", e);
    +      return Object.class;
    +    }
    +  }
    +
    +  @Override
    +  protected Class<?> getLookupFieldType(String fieldName)
    +  {
    +    try {
    +      return inputClass.getDeclaredField(fieldName).getType();
    +    } catch (NoSuchFieldException e) {
    +      logger.warn("Failed to find given fieldName, returning object type", e);
    +      return Object.class;
    +    }
    +  }
    +
    +  @SuppressWarnings({ "unchecked", "rawtypes" })
    +  private PojoUtils.Setter generateSettersForField(Class<?> klass, String outputFieldName)
    +      throws NoSuchFieldException, SecurityException
    +  {
    +    Field f = outputClass.getDeclaredField(outputFieldName);
    +    Class c = ClassUtils.primitiveToWrapper(f.getType());
    +    PojoUtils.Setter classSetter = PojoUtils.createSetter(klass, outputFieldName, c);
    +    return classSetter;
    +  }
    +
    +  @SuppressWarnings({ "unchecked", "rawtypes" })
    +  private PojoUtils.Getter generateGettersForField(Class<?> klass, String inputFieldName)
    +      throws NoSuchFieldException, SecurityException
    +  {
    +    Field f = klass.getDeclaredField(inputFieldName);
    +    Class c = ClassUtils.primitiveToWrapper(f.getType());
    +    PojoUtils.Getter classGetter = PojoUtils.createGetter(klass, inputFieldName, c);
    --- End diff --
    
    return directly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60902381
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/JDBCLoader.java ---
    @@ -0,0 +1,201 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.ResultSetMetaData;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.commons.collections.CollectionUtils;
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.google.common.collect.Lists;
    +import com.datatorrent.lib.db.jdbc.JdbcStore;
    +import com.datatorrent.lib.util.FieldInfo;
    +
    +/**
    + * <p>HBaseLoader extends from {@link JdbcStore} uses JDBC to connect and implements BackendLoaders interface.</p> <br/>
    + * <p>
    + * Properties:<br>
    + * <b>queryStmt</b>: Sql Prepared Statement which needs to be executed<br>
    + * <b>tableName</b>: JDBC table name<br>
    + * <br>
    + */
    +@InterfaceStability.Evolving
    +public class JDBCLoader extends JdbcStore implements BackendLoader
    +{
    +  protected String queryStmt;
    +
    +  protected String tableName;
    +
    +  protected transient List<FieldInfo> includeFieldInfo;
    +  protected transient List<FieldInfo> lookupFieldInfo;
    +
    +  protected Object getQueryResult(Object key)
    +  {
    +    try {
    +      PreparedStatement getStatement = getConnection().prepareStatement(queryStmt);
    +      ArrayList<Object> keys = (ArrayList<Object>)key;
    +      for (int i = 0; i < keys.size(); i++) {
    +        getStatement.setObject(i + 1, keys.get(i));
    +      }
    +      return getStatement.executeQuery();
    +    } catch (SQLException e) {
    +      throw new RuntimeException(e);
    +    }
    +  }
    +
    +  protected ArrayList<Object> getDataFrmResult(Object result) throws RuntimeException
    +  {
    +    try {
    +      ResultSet resultSet = (ResultSet)result;
    +      if (resultSet.next()) {
    +        ResultSetMetaData rsdata = resultSet.getMetaData();
    +        // If the includefields is empty, populate it from ResultSetMetaData
    +        if (CollectionUtils.isEmpty(includeFieldInfo)) {
    +          if (includeFieldInfo == null) {
    +            includeFieldInfo = new ArrayList<>();
    +          }
    +          for (int i = 1; i <= rsdata.getColumnCount(); i++) {
    +            String columnName = rsdata.getColumnName(i);
    +            // TODO: Take care of type conversion.
    +            includeFieldInfo.add(new FieldInfo(columnName, columnName, FieldInfo.SupportType.OBJECT));
    +          }
    +        }
    +
    +        ArrayList<Object> res = new ArrayList<Object>();
    +        for (FieldInfo f : includeFieldInfo) {
    +          res.add(getConvertedData(resultSet.getObject(f.getColumnName()), f));
    +        }
    +        return res;
    +      } else {
    +        return null;
    +      }
    +    } catch (SQLException e) {
    +      throw new RuntimeException(e);
    +    }
    +  }
    +
    +  private Object getConvertedData(Object object, FieldInfo f)
    +  {
    +    if (f.getType().getJavaType() == object.getClass()) {
    +      return object;
    +    } else {
    +      logger.warn("Type mismatch seen for field {}, returning as it is", f.getColumnName());
    +      return null;
    +    }
    +  }
    +
    +  private String generateQueryStmt()
    +  {
    +    String stmt = "select * from " + tableName + " where ";
    +    boolean first = true;
    +    for (FieldInfo fieldInfo : lookupFieldInfo) {
    +      if (first) {
    +        first = false;
    +      } else {
    +        stmt += " and ";
    +      }
    +      stmt += fieldInfo.getColumnName() + " = ?";
    +    }
    +
    +    logger.info("generateQueryStmt: {}", stmt);
    +    return stmt;
    +  }
    +
    +  public String getQueryStmt()
    +  {
    +    return queryStmt;
    +  }
    +
    +  /**
    +   * Set the sql Prepared Statement if the enrichment mechanism is query based.
    +   */
    +  public void setQueryStmt(String queryStmt)
    +  {
    +    this.queryStmt = queryStmt;
    +  }
    +
    +  public String getTableName()
    +  {
    +    return tableName;
    +  }
    +
    +  /**
    +   * Set the table name.
    +   */
    +  public void setTableName(String tableName)
    +  {
    +    this.tableName = tableName;
    +  }
    +
    +  @Override
    +  public void setFieldInfo(List<FieldInfo> lookupFieldInfo, List<FieldInfo> includeFieldInfo)
    +  {
    +    this.lookupFieldInfo = lookupFieldInfo;
    +    this.includeFieldInfo = includeFieldInfo;
    +    if (queryStmt == null) {
    +      queryStmt = generateQueryStmt();
    +    }
    +  }
    +
    +  @Override
    +  public Map<Object, Object> loadInitialData()
    +  {
    +    return null;
    --- End diff --
    
    I'm not sure if this is the best use of loadInitialData here.
    If one wants, this loader can be extended to override this method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60209083
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java ---
    @@ -0,0 +1,319 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.esotericsoftware.kryo.NotNull;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.lib.db.cache.CacheManager;
    +import com.datatorrent.lib.db.cache.CacheStore;
    +import com.datatorrent.lib.util.FieldInfo;
    +import com.datatorrent.lib.util.FieldInfo.SupportType;
    +
    +/**
    + * Base class for Enrichment Operator.&nbsp; Subclasses should provide implementation to getKey and convert.
    + * The operator receives a tuple and emits enriched tuple based on includeFields and lookupFields. <br/>
    + * <p>
    + * Properties:<br>
    + * <b>lookupFields</b>: List of comma separated keys for quick searching. Ex: Field1,Field2,Field3<br>
    + * <b>includeFields</b>: List of comma separated fields to be replaced/added to the input tuple. Ex: Field1,Field2,Field3<br>
    + * <b>store</b>: Specify the type of loader for looking data<br>
    + * <br>
    + *
    + * @param <INPUT>  Type of tuples which are received by this operator</T>
    + * @param <OUTPUT> Type of tuples which are emitted by this operator</T>
    + * @displayName Abstract Enrichment Operator
    + * @tags Enrichment
    + */
    +@InterfaceStability.Evolving
    +public abstract class AbstractEnricher<INPUT, OUTPUT> extends BaseOperator implements Operator.ActivationListener
    +{
    +  /**
    +   * Mandatory parameters for Enricher
    +   */
    +  @NotNull
    +  protected List<String> lookupFields;
    +  @NotNull
    +  protected List<String> includeFields;
    +  @NotNull
    +  private BackendLoader store;
    +
    +  /**
    +   * Optional parameters for enricher.
    +   */
    +  private int cacheExpirationInterval = 24 * 60 * 60 * 1000;
    +  private int cacheCleanupInterval = 24 * 60 * 60 * 1000;
    +  private int cacheSize = 1024;
    +
    +  /**
    +   * Helper variables.
    +   */
    +  private transient CacheManager cacheManager;
    +  protected transient List<FieldInfo> lookupFieldInfo = new ArrayList<>();
    +  protected transient List<FieldInfo> includeFieldInfo = new ArrayList<>();
    +
    +  /**
    +   * This method needs to be called by implementing class for processing a tuple for enrichment.
    +   * The method will take th tuple through following stages:
    +   * <ol>
    +   * <li>Call {@link #getKey(Object)} to retrieve key fields for lookup</li>
    +   * <li>Using key fields call cache manager to retrieve for any key that is cached already</li>
    +   * <li>If not found in cache, it'll do a lookup in configured backend store</li>
    +   * <li>The retrieved fields will be passed to {@link #convert(Object, Object)} method to create the final object</li>
    +   * <li>Finally {@link #emitTuple(Object)} is called for emitting the tuple</li>
    +   * </ol>
    +   *
    +   * @param tuple Input tuple that needs to get processed for enrichment.
    +   */
    +  protected void enrichTuple(INPUT tuple)
    +  {
    +    Object key = getKey(tuple);
    +    if (key != null) {
    +      Object result = cacheManager.get(key);
    +      OUTPUT out = convert(tuple, result);
    +      emitTuple(out);
    +    }
    +  }
    +
    +  /**
    +   * The method should be implemented by concrete class which returns an ArrayList<Object> containing all the fields
    +   * which forms key part of lookup.
    +   * The order of field values should be same as the one set in {@link #lookupFields} variable.
    +   *
    +   * @param tuple Input tuple from which fields values for key needs to be fetched.
    +   * @return Should return ArrayList<Object> which has fields values forming keys in same order as {@link #lookupFields}
    +   */
    +  protected abstract Object getKey(INPUT tuple);
    +
    +  /**
    +   * The method should be implemented by concrete class.
    +   * This method is expected to take input tuple and a externally fetched object contained field to be enriched, and
    +   * return a Enriched tuple which is ready to be emitted.
    --- End diff --
    
    an Enriched, 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60872509
  
    --- Diff: contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java ---
    @@ -0,0 +1,103 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.URL;
    +import java.util.Arrays;
    +import java.util.Map;
    +
    +import org.junit.Assert;
    +import org.junit.Rule;
    +import org.junit.Test;
    +
    +import org.apache.commons.io.FileUtils;
    +
    +import com.esotericsoftware.kryo.Kryo;
    +import com.google.common.collect.Maps;
    +import com.datatorrent.lib.testbench.CollectorTestSink;
    +import com.datatorrent.lib.util.TestUtils;
    +
    +public class FileEnrichmentTest
    +{
    +
    +  @Rule
    +  public final TestUtils.TestInfo testInfo = new TestUtils.TestInfo();
    +
    +  @Test
    +  public void testEnrichmentOperator() throws IOException, InterruptedException
    +  {
    +    URL origUrl = this.getClass().getResource("/productmapping.txt");
    +
    +    URL fileUrl = new URL(this.getClass().getResource("/").toString() + "productmapping1.txt");
    +    FileUtils.deleteQuietly(new File(fileUrl.getPath()));
    +    FileUtils.copyFile(new File(origUrl.getPath()), new File(fileUrl.getPath()));
    +
    +    MapEnricher oper = new MapEnricher();
    +    FSLoader store = new FSLoader();
    +    store.setFileName(fileUrl.toString());
    +    oper.setLookupFields(Arrays.asList("productId"));
    +    oper.setIncludeFields(Arrays.asList("productCategory"));
    +    oper.setStore(store);
    +
    +    oper.setup(null);
    +
    +    /* File contains 6 entries, but operator one entry is duplicate,
    +     * so cache should contains only 5 entries after scanning input file.
    +     */
    +    //Assert.assertEquals("Number of mappings ", 7, oper.cache.size());
    +
    +    CollectorTestSink<Map<String, Object>> sink = new CollectorTestSink<>();
    +    @SuppressWarnings({ "unchecked", "rawtypes" })
    +    CollectorTestSink<Object> tmp = (CollectorTestSink)sink;
    +    oper.output.setSink(tmp);
    +
    +    oper.activate(null);
    +
    +    oper.beginWindow(0);
    +    Map<String, Object> tuple = Maps.newHashMap();
    +    tuple.put("productId", 3);
    +    tuple.put("channelId", 4);
    +    tuple.put("amount", 10.0);
    +
    +    Kryo kryo = new Kryo();
    --- End diff --
    
    Is this required?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60869462
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/NullValuesCacheManager.java ---
    @@ -0,0 +1,60 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.datatorrent.lib.db.cache.CacheManager;
    +
    +/**
    + * Null Values Cache Manager. Using this NULL entries can be specified explicitly.
    + */
    +@InterfaceStability.Evolving
    +public class NullValuesCacheManager extends CacheManager
    +{
    +
    +  private static final NullObject NULL = new NullObject();
    --- End diff --
    
    What is the use case for this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60871561
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/POJOEnricher.java ---
    @@ -0,0 +1,264 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
    +import com.datatorrent.lib.util.FieldInfo;
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +
    +/**
    + * This class takes a POJO as input and extract the value of the lookupKey configured
    + * for this operator. It then does a lookup in file/DB to find matching entry and all key-value pairs
    + * specified in the file/DB or based on include fieldMap are added to original tuple.
    + * This operator is App Builder schema support enabled. <br>
    + * <p>
    + * Properties:<br>
    + * <b>inputClass</b>: Class to be loaded for the incoming data type<br>
    + * <b>outputClass</b>: Class to be loaded for the emitted data type<br>
    + * <br>
    + * <p>
    + * Example
    + * The file contains data in json format, one entry per line. during setup entire file is read and
    + * kept in memory for quick lookup.
    + * If file contains following lines, and operator is configured with lookup key "productId"
    + * { "productId": 1, "productCategory": 3 }
    + * { "productId": 4, "productCategory": 10 }
    + * { "productId": 3, "productCategory": 1 }
    + * <p>
    + * And input tuple is
    + * { amount=10.0, channelId=4, productId=3 }
    + * <p>
    + * The tuple is modified as below before operator emits it on output port.
    + * { amount=10.0, channelId=4, productId=3, productCategory=1 }
    + *
    + * @displayName BeanEnrichment
    + * @category Database
    + * @tags enrichment, pojo, schema, lookup
    + */
    +@InterfaceStability.Evolving
    +public class POJOEnricher extends AbstractEnricher<Object, Object>
    +{
    +  private static final Logger logger = LoggerFactory.getLogger(POJOEnricher.class);
    +
    +  /**
    +   * Helper fields
    +   */
    +  protected Class<?> inputClass;
    +  protected Class<?> outputClass;
    +  private transient Map<PojoUtils.Getter, PojoUtils.Setter> fieldMap = new HashMap<>();
    +  private transient List<PojoUtils.Setter> includeSetters = new ArrayList<>();
    +  private transient List<PojoUtils.Getter> lookupGetters = new ArrayList<>();
    +
    +  @InputPortFieldAnnotation(schemaRequired = true)
    +  public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
    +  {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      inputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
    +    }
    +
    +    @Override
    +    public void process(Object object)
    +    {
    +      processTuple(object);
    +    }
    +  };
    +
    +  @OutputPortFieldAnnotation(schemaRequired = true)
    +  public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>()
    +  {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
    +    }
    +  };
    +
    +  protected void processTuple(Object object)
    +  {
    +    enrichTuple(object);
    +  }
    +
    +  @Override
    +  protected Object getKey(Object tuple)
    +  {
    +    ArrayList<Object> keyList = new ArrayList<>();
    +    for (PojoUtils.Getter lookupGetter : lookupGetters) {
    +      keyList.add(lookupGetter.get(tuple));
    +    }
    +    return keyList;
    +  }
    +
    +  @Override
    +  protected Object convert(Object in, Object cached)
    +  {
    +    Object o;
    +
    +    try {
    +      o = outputClass.newInstance();
    +    } catch (InstantiationException | IllegalAccessException e) {
    +      logger.error("Failed to create new instance of output POJO", e);
    +      return null;
    +    }
    +
    +    for (Map.Entry<PojoUtils.Getter, PojoUtils.Setter> entry : fieldMap.entrySet()) {
    +      entry.getValue().set(o, entry.getKey().get(in));
    +    }
    +
    +    if (cached == null) {
    +      return null;
    +    }
    +
    +    ArrayList<Object> includeObjects = (ArrayList<Object>)cached;
    +    int idx = 0;
    +    for (PojoUtils.Setter includeSetter : includeSetters) {
    +      try {
    +        includeSetter.set(o, includeObjects.get(idx++));
    +      } catch (RuntimeException e) {
    +        logger.error("Failed to set the property. Continuing with default.", e);
    +      }
    +    }
    +
    +    return o;
    +  }
    +
    +  @Override
    +  protected void emitTuple(Object tuple)
    +  {
    +    output.emit(tuple);
    +  }
    +
    +  @Override
    +  protected Class<?> getIncludeFieldType(String fieldName)
    +  {
    +    try {
    +      return outputClass.getDeclaredField(fieldName).getType();
    +    } catch (NoSuchFieldException e) {
    +      logger.warn("Failed to find given fieldName, returning object type", e);
    +      return Object.class;
    +    }
    +  }
    +
    +  @Override
    +  protected Class<?> getLookupFieldType(String fieldName)
    +  {
    +    try {
    +      return inputClass.getDeclaredField(fieldName).getType();
    +    } catch (NoSuchFieldException e) {
    +      logger.warn("Failed to find given fieldName, returning object type", e);
    +      return Object.class;
    +    }
    +  }
    +
    +  @SuppressWarnings({ "unchecked", "rawtypes" })
    +  private PojoUtils.Setter generateSettersForField(Class<?> klass, String outputFieldName)
    +      throws NoSuchFieldException, SecurityException
    +  {
    +    Field f = outputClass.getDeclaredField(outputFieldName);
    +    Class c = ClassUtils.primitiveToWrapper(f.getType());
    +    PojoUtils.Setter classSetter = PojoUtils.createSetter(klass, outputFieldName, c);
    --- End diff --
    
    return directly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---