You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by prasi-in <gi...@git.apache.org> on 2015/12/15 10:55:28 UTC

[GitHub] incubator-apex-malhar pull request: Mlhr 1942.geode

GitHub user prasi-in opened a pull request:

    https://github.com/apache/incubator-apex-malhar/pull/131

    Mlhr 1942.geode

    Operator Support for Apache Geode(http://geode.incubator.apache.org/) .
    + Geode Store 
    + Abstract Implementations of Input and Output Operators.
    + POJOOutputOperartor.
    + Corresponding tests.
    + Adding POM dependencies for getting Apache geode dependencies.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/prasi-in/incubator-apex-malhar MLHR-1942.geode

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-apex-malhar/pull/131.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #131
    
----
commit e29915f3f2607e57f7f8ae78564e612162cc0454
Author: Prasad Indulkar <pr...@ampool.io>
Date:   2015-12-15T09:28:09Z

    MLHR-1942 Adding the geode store implementation and test.

commit 9ab3fe2ea08a90b6290323aa3b41edf7c1f14dfb
Author: Prasad Indulkar <pr...@ampool.io>
Date:   2015-12-15T09:31:25Z

    MLHR-1942 Abstract input/output operator and concrete POJO output operator. Adding geode dependency in pom.

commit 47e2be54ca1f80dc69867e170ecbf442c79c71b4
Author: Prasad Indulkar <pr...@ampool.io>
Date:   2015-12-15T09:49:43Z

    MLHR-1942 Correcting the style checks and package dependencies.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Mlhr 1942.geode

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/131#discussion_r48474210
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java ---
    @@ -0,0 +1,308 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.geode;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.gemstone.gemfire.cache.CacheClosedException;
    +import com.gemstone.gemfire.cache.CacheWriterException;
    +import com.gemstone.gemfire.cache.EntryNotFoundException;
    +import com.gemstone.gemfire.cache.Region;
    +import com.gemstone.gemfire.cache.TimeoutException;
    +import com.gemstone.gemfire.cache.client.ClientCache;
    +import com.gemstone.gemfire.cache.client.ClientCacheFactory;
    +import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
    +import com.gemstone.gemfire.cache.query.FunctionDomainException;
    +import com.gemstone.gemfire.cache.query.NameResolutionException;
    +import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
    +import com.gemstone.gemfire.cache.query.SelectResults;
    +import com.gemstone.gemfire.cache.query.TypeMismatchException;
    +
    +import com.datatorrent.lib.db.KeyValueStore;
    +
    +/**
    + * Provides the implementation of a Geode store.
    + *
    + * 
    + */
    +public class GeodeStore implements KeyValueStore, Serializable
    +{
    +  /**
    +   * 
    +   */
    +  private static final long serialVersionUID = -5076452548893319967L;
    +  private static final Logger LOG = LoggerFactory.getLogger(GeodeStore.class);
    +  private transient ClientCache clientCache = null;
    +  private transient Region<Object, Object> region = null;
    +  private String locatorHost;
    +  private int locatorPort;
    +  private String regionName;
    +
    +  private ClientCache initClient()
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (CacheClosedException ex) {
    +      LOG.info("error initiating client ", ex);
    +    }
    +
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @return the regionName
    +   */
    +  public String getRegionName()
    +  {
    +    return regionName;
    +  }
    +
    +  /**
    +   * @param regionName
    +   *          the regionName to set
    +   */
    +  public void setRegionName(String regionName)
    +  {
    +    this.regionName = regionName;
    +  }
    +
    +  /**
    +   * @return the clientCache
    +   */
    +  public ClientCache getClientCache()
    +  {
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @param clientCache
    +   *          the clientCache to set
    +   */
    +  public void setClientCache(ClientCache clientCache)
    +  {
    +    this.clientCache = clientCache;
    +  }
    +
    +  /**
    +   * @return the locatorPort
    +   */
    +  public int getLocatorPort()
    +  {
    +    return locatorPort;
    +  }
    +
    +  /**
    +   * @param locatorPort
    +   *          the locatorPort to set
    +   */
    +  public void setLocatorPort(int locatorPort)
    +  {
    +    this.locatorPort = locatorPort;
    +  }
    +
    +  /**
    +   * @return the locatorHost
    +   */
    +  public String getLocatorHost()
    +  {
    +    return locatorHost;
    +  }
    +
    +  /**
    +   * @param locatorHost
    +   *          the locatorHost to set
    +   */
    +  public void setLocatorHost(String locatorHost)
    +  {
    +    this.locatorHost = locatorHost;
    +  }
    +
    +  /**
    +   * @return the region
    +   * @throws IOException
    +   */
    +  public Region<Object, Object> getRegion() throws IOException
    +  {
    +    // return region;
    +    if (clientCache == null && clientCache.isClosed()) {
    +      initClient();
    +    }
    +
    +    if (region == null) {
    +      region = clientCache.getRegion(regionName);
    +      if (region == null) {
    +        region = clientCache.<Object, Object> createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
    +      }
    +    }
    +
    +    return region;
    +  }
    +
    +  /**
    +   * @param region
    +   *          the region to set
    +   */
    +  public void setRegion(Region<Object, Object> region) throws IOException
    +  {
    +    this.region = region;
    +  }
    +
    +  @Override
    +  public void connect() throws IOException
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (CacheClosedException ex) {
    +      LOG.info("error initiating client ", ex);
    +    }
    +
    +    region = clientCache.getRegion(getRegionName());
    +
    +    if (region == null) {
    +      region = clientCache.<Object, Object> createClientRegionFactory(ClientRegionShortcut.PROXY).create(
    +          getRegionName());
    +    }
    +
    +  }
    +
    +  @Override
    +  public void disconnect() throws IOException
    +  {
    +    clientCache.close();
    +
    +  }
    +
    +  @Override
    +  public boolean isConnected()
    +  {
    +    return (clientCache.isClosed());
    +
    +  }
    +
    +  /**
    +   * Gets the value given the key. Note that it does NOT work with hash values
    +   * or list values
    +   *
    +   * @param key
    +   * @return The value.
    +   */
    +  @Override
    +  public Object get(Object key)
    +  {
    +
    +    try {
    +      return (getRegion().get(key));
    +    } catch (IOException ex) {
    +      LOG.info("error getting object ", ex);
    +      return null;
    +    }
    +
    +  }
    +
    +  /**
    +   * Gets all the values given the keys. Note that it does NOT work with hash
    +   * values or list values
    +   *
    +   * @param keys
    +   * @return All values for the given keys.
    +   */
    +  @SuppressWarnings("unchecked")
    +  @Override
    +  public List<Object> getAll(List<Object> keys)
    +  {
    +
    +    List<Object> values = new ArrayList<Object>();
    +
    +    try {
    +      final Map<Object, Object> entries = getRegion().getAll(keys);
    +      for (int i = 0; i < keys.size(); i++) {
    --- End diff --
    
    Iterate through entries instead


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Mlhr 1942.geode

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/131#discussion_r48879075
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java ---
    @@ -0,0 +1,308 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.geode;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.gemstone.gemfire.cache.CacheClosedException;
    +import com.gemstone.gemfire.cache.CacheWriterException;
    +import com.gemstone.gemfire.cache.EntryNotFoundException;
    +import com.gemstone.gemfire.cache.Region;
    +import com.gemstone.gemfire.cache.TimeoutException;
    +import com.gemstone.gemfire.cache.client.ClientCache;
    +import com.gemstone.gemfire.cache.client.ClientCacheFactory;
    +import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
    +import com.gemstone.gemfire.cache.query.FunctionDomainException;
    +import com.gemstone.gemfire.cache.query.NameResolutionException;
    +import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
    +import com.gemstone.gemfire.cache.query.SelectResults;
    +import com.gemstone.gemfire.cache.query.TypeMismatchException;
    +
    +import com.datatorrent.lib.db.KeyValueStore;
    +
    +/**
    + * Provides the implementation of a Geode store.
    + *
    + * 
    + */
    +public class GeodeStore implements KeyValueStore, Serializable
    +{
    +  /**
    +   * 
    +   */
    +  private static final long serialVersionUID = -5076452548893319967L;
    +  private static final Logger LOG = LoggerFactory.getLogger(GeodeStore.class);
    +  private transient ClientCache clientCache = null;
    +  private transient Region<Object, Object> region = null;
    +  private String locatorHost;
    +  private int locatorPort;
    +  private String regionName;
    +
    +  private ClientCache initClient()
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (CacheClosedException ex) {
    +      LOG.info("error initiating client ", ex);
    --- End diff --
    
    Throw exception instead of logging it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Mlhr 1942.geode

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/131#discussion_r48880616
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java ---
    @@ -0,0 +1,308 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.geode;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.gemstone.gemfire.cache.CacheClosedException;
    +import com.gemstone.gemfire.cache.CacheWriterException;
    +import com.gemstone.gemfire.cache.EntryNotFoundException;
    +import com.gemstone.gemfire.cache.Region;
    +import com.gemstone.gemfire.cache.TimeoutException;
    +import com.gemstone.gemfire.cache.client.ClientCache;
    +import com.gemstone.gemfire.cache.client.ClientCacheFactory;
    +import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
    +import com.gemstone.gemfire.cache.query.FunctionDomainException;
    +import com.gemstone.gemfire.cache.query.NameResolutionException;
    +import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
    +import com.gemstone.gemfire.cache.query.SelectResults;
    +import com.gemstone.gemfire.cache.query.TypeMismatchException;
    +
    +import com.datatorrent.lib.db.KeyValueStore;
    +
    +/**
    + * Provides the implementation of a Geode store.
    + *
    + * 
    + */
    +public class GeodeStore implements KeyValueStore, Serializable
    +{
    +  /**
    +   * 
    +   */
    +  private static final long serialVersionUID = -5076452548893319967L;
    +  private static final Logger LOG = LoggerFactory.getLogger(GeodeStore.class);
    +  private transient ClientCache clientCache = null;
    +  private transient Region<Object, Object> region = null;
    +  private String locatorHost;
    +  private int locatorPort;
    +  private String regionName;
    +
    +  private ClientCache initClient()
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (CacheClosedException ex) {
    +      LOG.info("error initiating client ", ex);
    +    }
    +
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @return the regionName
    +   */
    +  public String getRegionName()
    +  {
    +    return regionName;
    +  }
    +
    +  /**
    +   * @param regionName
    +   *          the regionName to set
    +   */
    +  public void setRegionName(String regionName)
    +  {
    +    this.regionName = regionName;
    +  }
    +
    +  /**
    +   * @return the clientCache
    +   */
    +  public ClientCache getClientCache()
    +  {
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @param clientCache
    +   *          the clientCache to set
    +   */
    +  public void setClientCache(ClientCache clientCache)
    +  {
    +    this.clientCache = clientCache;
    +  }
    +
    +  /**
    +   * @return the locatorPort
    +   */
    +  public int getLocatorPort()
    +  {
    +    return locatorPort;
    +  }
    +
    +  /**
    +   * @param locatorPort
    +   *          the locatorPort to set
    +   */
    +  public void setLocatorPort(int locatorPort)
    +  {
    +    this.locatorPort = locatorPort;
    +  }
    +
    +  /**
    +   * @return the locatorHost
    +   */
    +  public String getLocatorHost()
    +  {
    +    return locatorHost;
    +  }
    +
    +  /**
    +   * @param locatorHost
    +   *          the locatorHost to set
    +   */
    +  public void setLocatorHost(String locatorHost)
    +  {
    +    this.locatorHost = locatorHost;
    +  }
    +
    +  /**
    +   * @return the region
    +   * @throws IOException
    +   */
    +  public Region<Object, Object> getRegion() throws IOException
    +  {
    +    // return region;
    +    if (clientCache == null || clientCache.isClosed()) {
    +      initClient();
    +    }
    +
    +    if (region == null) {
    +      region = clientCache.getRegion(regionName);
    +      if (region == null) {
    +        region = clientCache.<Object, Object> createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
    +      }
    +    }
    +
    +    return region;
    +  }
    +
    +  /**
    +   * @param region
    +   *          the region to set
    +   */
    +  public void setRegion(Region<Object, Object> region) throws IOException
    --- End diff --
    
    Since region is marked as transient, the value set by this setter will not be retained post serialization and de-serialization. I don't think setter is required for this property


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Mlhr 1942.geode

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-apex-malhar/pull/131


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Mlhr 1942.geode

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/131#discussion_r48880841
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java ---
    @@ -0,0 +1,308 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.geode;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.gemstone.gemfire.cache.CacheClosedException;
    +import com.gemstone.gemfire.cache.CacheWriterException;
    +import com.gemstone.gemfire.cache.EntryNotFoundException;
    +import com.gemstone.gemfire.cache.Region;
    +import com.gemstone.gemfire.cache.TimeoutException;
    +import com.gemstone.gemfire.cache.client.ClientCache;
    +import com.gemstone.gemfire.cache.client.ClientCacheFactory;
    +import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
    +import com.gemstone.gemfire.cache.query.FunctionDomainException;
    +import com.gemstone.gemfire.cache.query.NameResolutionException;
    +import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
    +import com.gemstone.gemfire.cache.query.SelectResults;
    +import com.gemstone.gemfire.cache.query.TypeMismatchException;
    +
    +import com.datatorrent.lib.db.KeyValueStore;
    +
    +/**
    + * Provides the implementation of a Geode store.
    + *
    + * 
    + */
    +public class GeodeStore implements KeyValueStore, Serializable
    +{
    +  /**
    +   * 
    +   */
    +  private static final long serialVersionUID = -5076452548893319967L;
    +  private static final Logger LOG = LoggerFactory.getLogger(GeodeStore.class);
    +  private transient ClientCache clientCache = null;
    +  private transient Region<Object, Object> region = null;
    +  private String locatorHost;
    +  private int locatorPort;
    +  private String regionName;
    +
    +  private ClientCache initClient()
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (CacheClosedException ex) {
    +      LOG.info("error initiating client ", ex);
    +    }
    +
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @return the regionName
    +   */
    +  public String getRegionName()
    +  {
    +    return regionName;
    +  }
    +
    +  /**
    +   * @param regionName
    +   *          the regionName to set
    +   */
    +  public void setRegionName(String regionName)
    +  {
    +    this.regionName = regionName;
    +  }
    +
    +  /**
    +   * @return the clientCache
    +   */
    +  public ClientCache getClientCache()
    +  {
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @param clientCache
    +   *          the clientCache to set
    +   */
    +  public void setClientCache(ClientCache clientCache)
    +  {
    +    this.clientCache = clientCache;
    +  }
    +
    +  /**
    +   * @return the locatorPort
    +   */
    +  public int getLocatorPort()
    +  {
    +    return locatorPort;
    +  }
    +
    +  /**
    +   * @param locatorPort
    +   *          the locatorPort to set
    +   */
    +  public void setLocatorPort(int locatorPort)
    +  {
    +    this.locatorPort = locatorPort;
    +  }
    +
    +  /**
    +   * @return the locatorHost
    +   */
    +  public String getLocatorHost()
    +  {
    +    return locatorHost;
    +  }
    +
    +  /**
    +   * @param locatorHost
    +   *          the locatorHost to set
    +   */
    +  public void setLocatorHost(String locatorHost)
    +  {
    +    this.locatorHost = locatorHost;
    +  }
    +
    +  /**
    +   * @return the region
    +   * @throws IOException
    +   */
    +  public Region<Object, Object> getRegion() throws IOException
    +  {
    +    // return region;
    +    if (clientCache == null || clientCache.isClosed()) {
    +      initClient();
    +    }
    +
    +    if (region == null) {
    +      region = clientCache.getRegion(regionName);
    +      if (region == null) {
    +        region = clientCache.<Object, Object> createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
    +      }
    +    }
    +
    +    return region;
    +  }
    +
    +  /**
    +   * @param region
    +   *          the region to set
    +   */
    +  public void setRegion(Region<Object, Object> region) throws IOException
    +  {
    +    this.region = region;
    +  }
    +
    +  @Override
    +  public void connect() throws IOException
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    --- End diff --
    
    why use getters for locatorhost and locatorport and not use them directly?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Mlhr 1942.geode

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/131#discussion_r48473930
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java ---
    @@ -0,0 +1,308 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.geode;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.gemstone.gemfire.cache.CacheClosedException;
    +import com.gemstone.gemfire.cache.CacheWriterException;
    +import com.gemstone.gemfire.cache.EntryNotFoundException;
    +import com.gemstone.gemfire.cache.Region;
    +import com.gemstone.gemfire.cache.TimeoutException;
    +import com.gemstone.gemfire.cache.client.ClientCache;
    +import com.gemstone.gemfire.cache.client.ClientCacheFactory;
    +import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
    +import com.gemstone.gemfire.cache.query.FunctionDomainException;
    +import com.gemstone.gemfire.cache.query.NameResolutionException;
    +import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
    +import com.gemstone.gemfire.cache.query.SelectResults;
    +import com.gemstone.gemfire.cache.query.TypeMismatchException;
    +
    +import com.datatorrent.lib.db.KeyValueStore;
    +
    +/**
    + * Provides the implementation of a Geode store.
    + *
    + * 
    + */
    +public class GeodeStore implements KeyValueStore, Serializable
    +{
    +  /**
    +   * 
    +   */
    +  private static final long serialVersionUID = -5076452548893319967L;
    +  private static final Logger LOG = LoggerFactory.getLogger(GeodeStore.class);
    +  private transient ClientCache clientCache = null;
    +  private transient Region<Object, Object> region = null;
    +  private String locatorHost;
    +  private int locatorPort;
    +  private String regionName;
    +
    +  private ClientCache initClient()
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (CacheClosedException ex) {
    +      LOG.info("error initiating client ", ex);
    +    }
    +
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @return the regionName
    +   */
    +  public String getRegionName()
    +  {
    +    return regionName;
    +  }
    +
    +  /**
    +   * @param regionName
    +   *          the regionName to set
    +   */
    +  public void setRegionName(String regionName)
    +  {
    +    this.regionName = regionName;
    +  }
    +
    +  /**
    +   * @return the clientCache
    +   */
    +  public ClientCache getClientCache()
    +  {
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @param clientCache
    +   *          the clientCache to set
    +   */
    +  public void setClientCache(ClientCache clientCache)
    +  {
    +    this.clientCache = clientCache;
    +  }
    +
    +  /**
    +   * @return the locatorPort
    +   */
    +  public int getLocatorPort()
    +  {
    +    return locatorPort;
    +  }
    +
    +  /**
    +   * @param locatorPort
    +   *          the locatorPort to set
    +   */
    +  public void setLocatorPort(int locatorPort)
    +  {
    +    this.locatorPort = locatorPort;
    +  }
    +
    +  /**
    +   * @return the locatorHost
    +   */
    +  public String getLocatorHost()
    +  {
    +    return locatorHost;
    +  }
    +
    +  /**
    +   * @param locatorHost
    +   *          the locatorHost to set
    +   */
    +  public void setLocatorHost(String locatorHost)
    +  {
    +    this.locatorHost = locatorHost;
    +  }
    +
    +  /**
    +   * @return the region
    +   * @throws IOException
    +   */
    +  public Region<Object, Object> getRegion() throws IOException
    +  {
    +    // return region;
    +    if (clientCache == null && clientCache.isClosed()) {
    --- End diff --
    
    possible null pointer exception ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Mlhr 1942.geode

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/131#discussion_r48880762
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java ---
    @@ -0,0 +1,308 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.geode;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.gemstone.gemfire.cache.CacheClosedException;
    +import com.gemstone.gemfire.cache.CacheWriterException;
    +import com.gemstone.gemfire.cache.EntryNotFoundException;
    +import com.gemstone.gemfire.cache.Region;
    +import com.gemstone.gemfire.cache.TimeoutException;
    +import com.gemstone.gemfire.cache.client.ClientCache;
    +import com.gemstone.gemfire.cache.client.ClientCacheFactory;
    +import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
    +import com.gemstone.gemfire.cache.query.FunctionDomainException;
    +import com.gemstone.gemfire.cache.query.NameResolutionException;
    +import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
    +import com.gemstone.gemfire.cache.query.SelectResults;
    +import com.gemstone.gemfire.cache.query.TypeMismatchException;
    +
    +import com.datatorrent.lib.db.KeyValueStore;
    +
    +/**
    + * Provides the implementation of a Geode store.
    + *
    + * 
    + */
    +public class GeodeStore implements KeyValueStore, Serializable
    +{
    +  /**
    +   * 
    +   */
    +  private static final long serialVersionUID = -5076452548893319967L;
    +  private static final Logger LOG = LoggerFactory.getLogger(GeodeStore.class);
    +  private transient ClientCache clientCache = null;
    +  private transient Region<Object, Object> region = null;
    +  private String locatorHost;
    +  private int locatorPort;
    +  private String regionName;
    +
    +  private ClientCache initClient()
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (CacheClosedException ex) {
    +      LOG.info("error initiating client ", ex);
    +    }
    +
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @return the regionName
    +   */
    +  public String getRegionName()
    +  {
    +    return regionName;
    +  }
    +
    +  /**
    +   * @param regionName
    +   *          the regionName to set
    +   */
    +  public void setRegionName(String regionName)
    +  {
    +    this.regionName = regionName;
    +  }
    +
    +  /**
    +   * @return the clientCache
    +   */
    +  public ClientCache getClientCache()
    +  {
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @param clientCache
    +   *          the clientCache to set
    +   */
    +  public void setClientCache(ClientCache clientCache)
    +  {
    +    this.clientCache = clientCache;
    +  }
    +
    +  /**
    +   * @return the locatorPort
    +   */
    +  public int getLocatorPort()
    +  {
    +    return locatorPort;
    +  }
    +
    +  /**
    +   * @param locatorPort
    +   *          the locatorPort to set
    +   */
    +  public void setLocatorPort(int locatorPort)
    +  {
    +    this.locatorPort = locatorPort;
    +  }
    +
    +  /**
    +   * @return the locatorHost
    +   */
    +  public String getLocatorHost()
    +  {
    +    return locatorHost;
    +  }
    +
    +  /**
    +   * @param locatorHost
    +   *          the locatorHost to set
    +   */
    +  public void setLocatorHost(String locatorHost)
    +  {
    +    this.locatorHost = locatorHost;
    +  }
    +
    +  /**
    +   * @return the region
    +   * @throws IOException
    +   */
    +  public Region<Object, Object> getRegion() throws IOException
    +  {
    +    // return region;
    +    if (clientCache == null || clientCache.isClosed()) {
    +      initClient();
    +    }
    +
    +    if (region == null) {
    +      region = clientCache.getRegion(regionName);
    +      if (region == null) {
    +        region = clientCache.<Object, Object> createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
    +      }
    +    }
    +
    +    return region;
    +  }
    +
    +  /**
    +   * @param region
    +   *          the region to set
    +   */
    +  public void setRegion(Region<Object, Object> region) throws IOException
    +  {
    +    this.region = region;
    +  }
    +
    +  @Override
    +  public void connect() throws IOException
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (Exception ex) {
    --- End diff --
    
    Why catch generic Exception??


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Mlhr 1942.geode

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/131#discussion_r50760052
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java ---
    @@ -0,0 +1,298 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.geode;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.accumulo.core.client.impl.thrift.ThriftTest.Processor.throwsError;
    +
    +import com.gemstone.gemfire.cache.CacheClosedException;
    +import com.gemstone.gemfire.cache.CacheWriterException;
    +import com.gemstone.gemfire.cache.EntryNotFoundException;
    +import com.gemstone.gemfire.cache.Region;
    +import com.gemstone.gemfire.cache.TimeoutException;
    +import com.gemstone.gemfire.cache.client.ClientCache;
    +import com.gemstone.gemfire.cache.client.ClientCacheFactory;
    +import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
    +import com.gemstone.gemfire.cache.query.FunctionDomainException;
    +import com.gemstone.gemfire.cache.query.NameResolutionException;
    +import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
    +import com.gemstone.gemfire.cache.query.SelectResults;
    +import com.gemstone.gemfire.cache.query.TypeMismatchException;
    +
    +import com.datatorrent.lib.db.KeyValueStore;
    +
    +/**
    + * Provides the implementation of a Geode store.
    + * Geode is a distributed in-memory database
    + *  that provides reliable asynchronous event notifications and guaranteed message delivery.
    + * Geode is a data management platform that provides real-time
    + * , consistent access to data-intensive applications.
    + * 
    + */
    +public class GeodeStore implements KeyValueStore, Serializable
    +{
    +  /**
    +   * 
    +   */
    +  private static final long serialVersionUID = -5076452548893319967L;
    +  private static final Logger logger = LoggerFactory.getLogger(GeodeStore.class);
    +  private transient ClientCache clientCache = null;
    +  private transient Region<Object, Object> region = null;
    +  private String locatorHost;
    +  private int locatorPort;
    +  private String regionName;
    +
    +  private ClientCache initClient()
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (CacheClosedException ex) {
    +      throw new RuntimeException("Exception while creating cache", ex);
    +
    +    }
    +
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @return the regionName
    +   */
    +  public String getRegionName()
    +  {
    +    return regionName;
    +  }
    +
    +
    +  /**
    +   * @return the clientCache
    +   */
    +  public ClientCache getClientCache()
    +  {
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @return the locatorPort
    +   */
    +  public int getLocatorPort()
    +  {
    +    return locatorPort;
    +  }
    +
    +  /**
    +   * @param locatorPort
    +   *          the locatorPort to set
    +   */
    +  public void setLocatorPort(int locatorPort)
    +  {
    +    this.locatorPort = locatorPort;
    +  }
    +
    +  /**
    +   * @return the locatorHost
    +   */
    +  public String getLocatorHost()
    +  {
    +    return locatorHost;
    +  }
    +
    +  /**
    +   * @param locatorHost
    +   *          the locatorHost to set
    +   */
    +  public void setLocatorHost(String locatorHost)
    +  {
    +    this.locatorHost = locatorHost;
    +  }
    +
    +  /**
    +   * @return the region
    +   * @throws IOException
    +   */
    +  public Region<Object, Object> getRegion() throws IOException
    +  {
    +    // return region;
    +    if (clientCache == null || clientCache.isClosed()) {
    +      initClient();
    +    }
    +
    +    if (region == null) {
    +      region = clientCache.getRegion(regionName);
    +      if (region == null) {
    +        region = clientCache.<Object, Object> createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
    +      }
    +    }
    +
    +    return region;
    +  }
    +
    +  @Override
    +  public void connect() throws IOException
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (Exception ex) {
    +      throw new RuntimeException(ex);
    +    }
    +
    +    region = clientCache.getRegion(getRegionName());
    +
    +    if (region == null) {
    +      region = clientCache.<Object, Object> createClientRegionFactory(ClientRegionShortcut.PROXY).create(
    +          getRegionName());
    +    }
    +
    +  }
    +
    +  @Override
    +  public void disconnect() throws IOException
    +  {
    +    clientCache.close();
    +
    +  }
    +
    +  @Override
    +  public boolean isConnected()
    +  {
    +    return (clientCache.isClosed());
    +
    +  }
    +
    +  /**
    +   * Gets the value given the key. Note that it does NOT work with hash values
    +   * or list values
    +   *
    +   * @param key
    +   * @return The value.
    +   */
    +  @Override
    +  public Object get(Object key)
    +  {
    +
    +    try {
    +      return (getRegion().get(key));
    +    } catch (IOException ex) {
    +      throw new RuntimeException("Exception while getting the object", ex);      
    +
    +    }
    +
    +  }
    +
    +  /**
    +   * Gets all the values given the keys. Note that it does NOT work with hash
    +   * values or list values
    +   *
    +   * @param keys
    +   * @return All values for the given keys.
    +   */
    +  @SuppressWarnings("unchecked")
    +  @Override
    +  public List<Object> getAll(List<Object> keys)
    +  {
    +
    +    List<Object> values = new ArrayList<Object>();
    +
    +    try {
    +      final Map<Object, Object> entries = getRegion().getAll(keys);
    +      for (int i = 0; i < keys.size(); i++) {
    +        values.add(entries.get(keys.get(i)));
    +      }
    +    } catch (IOException ex) {
    +      logger.info("error getting region ", ex);
    --- End diff --
    
    why swallow exception?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Mlhr 1942.geode

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/131#discussion_r50761895
  
    --- Diff: contrib/pom.xml ---
    @@ -623,5 +629,10 @@
           <artifactId>super-csv-joda</artifactId>
           <version>2.3.1</version>
         </dependency>
    +     <dependency>
    +      <groupId>com.gemstone.gemfire</groupId>
    +      <artifactId>gemfire</artifactId>
    +      <version>8.2.0</version>
    --- End diff --
    
    Please update the dependency consistent to #125


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Mlhr 1942.geode

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/131#discussion_r52194806
  
    --- Diff: contrib/pom.xml ---
    @@ -53,6 +53,18 @@
             <updatePolicy>daily</updatePolicy>
           </releases>
         </repository>
    +    <repository>
    --- End diff --
    
    http://mvnrepository.com/artifact/org.apache.geode/gemfire-core/1.0.0-incubating.M1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Mlhr 1942.geode

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/131#discussion_r48881232
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java ---
    @@ -0,0 +1,308 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.geode;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.gemstone.gemfire.cache.CacheClosedException;
    +import com.gemstone.gemfire.cache.CacheWriterException;
    +import com.gemstone.gemfire.cache.EntryNotFoundException;
    +import com.gemstone.gemfire.cache.Region;
    +import com.gemstone.gemfire.cache.TimeoutException;
    +import com.gemstone.gemfire.cache.client.ClientCache;
    +import com.gemstone.gemfire.cache.client.ClientCacheFactory;
    +import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
    +import com.gemstone.gemfire.cache.query.FunctionDomainException;
    +import com.gemstone.gemfire.cache.query.NameResolutionException;
    +import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
    +import com.gemstone.gemfire.cache.query.SelectResults;
    +import com.gemstone.gemfire.cache.query.TypeMismatchException;
    +
    +import com.datatorrent.lib.db.KeyValueStore;
    +
    +/**
    + * Provides the implementation of a Geode store.
    --- End diff --
    
    Please add more details about the Store like properties etc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Mlhr 1942.geode

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/131#discussion_r48473514
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeInputOperator.java ---
    @@ -0,0 +1,39 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.geode;
    +
    +import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator;
    +
    +/**
    + * This is the base implementation used for geode input adapters.&nbsp; A
    + * concrete operator should be created from this skeleton implementation.
    + * <p>
    + * </p>
    + * 
    + * @displayName Abstract Geode Input
    + * @category Input
    + * @tags geode, key value
    + *
    + * @param <T>
    + *          The tuple type.
    + * 
    + */
    +public abstract class AbstractGeodeInputOperator<T> extends AbstractKeyValueStoreInputOperator<T, GeodeStore>
    +{
    --- End diff --
    
    Should we consider initializing the store here ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Mlhr 1942.geode

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/131#discussion_r48880540
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java ---
    @@ -0,0 +1,308 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.geode;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.gemstone.gemfire.cache.CacheClosedException;
    +import com.gemstone.gemfire.cache.CacheWriterException;
    +import com.gemstone.gemfire.cache.EntryNotFoundException;
    +import com.gemstone.gemfire.cache.Region;
    +import com.gemstone.gemfire.cache.TimeoutException;
    +import com.gemstone.gemfire.cache.client.ClientCache;
    +import com.gemstone.gemfire.cache.client.ClientCacheFactory;
    +import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
    +import com.gemstone.gemfire.cache.query.FunctionDomainException;
    +import com.gemstone.gemfire.cache.query.NameResolutionException;
    +import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
    +import com.gemstone.gemfire.cache.query.SelectResults;
    +import com.gemstone.gemfire.cache.query.TypeMismatchException;
    +
    +import com.datatorrent.lib.db.KeyValueStore;
    +
    +/**
    + * Provides the implementation of a Geode store.
    + *
    + * 
    + */
    +public class GeodeStore implements KeyValueStore, Serializable
    +{
    +  /**
    +   * 
    +   */
    +  private static final long serialVersionUID = -5076452548893319967L;
    +  private static final Logger LOG = LoggerFactory.getLogger(GeodeStore.class);
    +  private transient ClientCache clientCache = null;
    +  private transient Region<Object, Object> region = null;
    +  private String locatorHost;
    +  private int locatorPort;
    +  private String regionName;
    +
    +  private ClientCache initClient()
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (CacheClosedException ex) {
    +      LOG.info("error initiating client ", ex);
    +    }
    +
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @return the regionName
    +   */
    +  public String getRegionName()
    +  {
    +    return regionName;
    +  }
    +
    +  /**
    +   * @param regionName
    +   *          the regionName to set
    +   */
    +  public void setRegionName(String regionName)
    +  {
    +    this.regionName = regionName;
    +  }
    +
    +  /**
    +   * @return the clientCache
    +   */
    +  public ClientCache getClientCache()
    +  {
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @param clientCache
    +   *          the clientCache to set
    +   */
    +  public void setClientCache(ClientCache clientCache)
    +  {
    --- End diff --
    
    Since clientCache is marked as transient, the value set by this setter will not be retained post serialization and deserialization. I don't think setter is required for this property


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Mlhr 1942.geode

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/131#discussion_r48881047
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java ---
    @@ -0,0 +1,308 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.geode;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.gemstone.gemfire.cache.CacheClosedException;
    +import com.gemstone.gemfire.cache.CacheWriterException;
    +import com.gemstone.gemfire.cache.EntryNotFoundException;
    +import com.gemstone.gemfire.cache.Region;
    +import com.gemstone.gemfire.cache.TimeoutException;
    +import com.gemstone.gemfire.cache.client.ClientCache;
    +import com.gemstone.gemfire.cache.client.ClientCacheFactory;
    +import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
    +import com.gemstone.gemfire.cache.query.FunctionDomainException;
    +import com.gemstone.gemfire.cache.query.NameResolutionException;
    +import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
    +import com.gemstone.gemfire.cache.query.SelectResults;
    +import com.gemstone.gemfire.cache.query.TypeMismatchException;
    +
    +import com.datatorrent.lib.db.KeyValueStore;
    +
    +/**
    + * Provides the implementation of a Geode store.
    + *
    + * 
    + */
    +public class GeodeStore implements KeyValueStore, Serializable
    +{
    +  /**
    +   * 
    +   */
    +  private static final long serialVersionUID = -5076452548893319967L;
    +  private static final Logger LOG = LoggerFactory.getLogger(GeodeStore.class);
    +  private transient ClientCache clientCache = null;
    +  private transient Region<Object, Object> region = null;
    +  private String locatorHost;
    +  private int locatorPort;
    +  private String regionName;
    +
    +  private ClientCache initClient()
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (CacheClosedException ex) {
    +      LOG.info("error initiating client ", ex);
    +    }
    +
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @return the regionName
    +   */
    +  public String getRegionName()
    +  {
    +    return regionName;
    +  }
    +
    +  /**
    +   * @param regionName
    +   *          the regionName to set
    +   */
    +  public void setRegionName(String regionName)
    +  {
    +    this.regionName = regionName;
    +  }
    +
    +  /**
    +   * @return the clientCache
    +   */
    +  public ClientCache getClientCache()
    +  {
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @param clientCache
    +   *          the clientCache to set
    +   */
    +  public void setClientCache(ClientCache clientCache)
    +  {
    +    this.clientCache = clientCache;
    +  }
    +
    +  /**
    +   * @return the locatorPort
    +   */
    +  public int getLocatorPort()
    +  {
    +    return locatorPort;
    +  }
    +
    +  /**
    +   * @param locatorPort
    +   *          the locatorPort to set
    +   */
    +  public void setLocatorPort(int locatorPort)
    +  {
    +    this.locatorPort = locatorPort;
    +  }
    +
    +  /**
    +   * @return the locatorHost
    +   */
    +  public String getLocatorHost()
    +  {
    +    return locatorHost;
    +  }
    +
    +  /**
    +   * @param locatorHost
    +   *          the locatorHost to set
    +   */
    +  public void setLocatorHost(String locatorHost)
    +  {
    +    this.locatorHost = locatorHost;
    +  }
    +
    +  /**
    +   * @return the region
    +   * @throws IOException
    +   */
    +  public Region<Object, Object> getRegion() throws IOException
    +  {
    +    // return region;
    +    if (clientCache == null || clientCache.isClosed()) {
    +      initClient();
    +    }
    +
    +    if (region == null) {
    +      region = clientCache.getRegion(regionName);
    +      if (region == null) {
    +        region = clientCache.<Object, Object> createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
    +      }
    +    }
    +
    +    return region;
    +  }
    +
    +  /**
    +   * @param region
    +   *          the region to set
    +   */
    +  public void setRegion(Region<Object, Object> region) throws IOException
    +  {
    +    this.region = region;
    +  }
    +
    +  @Override
    +  public void connect() throws IOException
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (Exception ex) {
    +      throw new RuntimeException(ex);
    +    }
    +
    +    region = clientCache.getRegion(getRegionName());
    +
    +    if (region == null) {
    +      region = clientCache.<Object, Object> createClientRegionFactory(ClientRegionShortcut.PROXY).create(
    +          getRegionName());
    +    }
    +
    +  }
    +
    +  @Override
    +  public void disconnect() throws IOException
    +  {
    +    clientCache.close();
    +
    +  }
    +
    +  @Override
    +  public boolean isConnected()
    +  {
    +    return (clientCache.isClosed());
    +
    +  }
    +
    +  /**
    +   * Gets the value given the key. Note that it does NOT work with hash values
    +   * or list values
    +   *
    +   * @param key
    +   * @return The value.
    +   */
    +  @Override
    +  public Object get(Object key)
    +  {
    +
    +    try {
    +      return (getRegion().get(key));
    --- End diff --
    
    again same.. why getters for local variables?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Mlhr 1942.geode

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/131#discussion_r50760108
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java ---
    @@ -0,0 +1,298 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.geode;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.accumulo.core.client.impl.thrift.ThriftTest.Processor.throwsError;
    +
    +import com.gemstone.gemfire.cache.CacheClosedException;
    +import com.gemstone.gemfire.cache.CacheWriterException;
    +import com.gemstone.gemfire.cache.EntryNotFoundException;
    +import com.gemstone.gemfire.cache.Region;
    +import com.gemstone.gemfire.cache.TimeoutException;
    +import com.gemstone.gemfire.cache.client.ClientCache;
    +import com.gemstone.gemfire.cache.client.ClientCacheFactory;
    +import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
    +import com.gemstone.gemfire.cache.query.FunctionDomainException;
    +import com.gemstone.gemfire.cache.query.NameResolutionException;
    +import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
    +import com.gemstone.gemfire.cache.query.SelectResults;
    +import com.gemstone.gemfire.cache.query.TypeMismatchException;
    +
    +import com.datatorrent.lib.db.KeyValueStore;
    +
    +/**
    + * Provides the implementation of a Geode store.
    + * Geode is a distributed in-memory database
    + *  that provides reliable asynchronous event notifications and guaranteed message delivery.
    + * Geode is a data management platform that provides real-time
    + * , consistent access to data-intensive applications.
    + * 
    + */
    +public class GeodeStore implements KeyValueStore, Serializable
    +{
    +  /**
    +   * 
    +   */
    +  private static final long serialVersionUID = -5076452548893319967L;
    +  private static final Logger logger = LoggerFactory.getLogger(GeodeStore.class);
    +  private transient ClientCache clientCache = null;
    +  private transient Region<Object, Object> region = null;
    +  private String locatorHost;
    +  private int locatorPort;
    +  private String regionName;
    +
    +  private ClientCache initClient()
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (CacheClosedException ex) {
    +      throw new RuntimeException("Exception while creating cache", ex);
    +
    +    }
    +
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @return the regionName
    +   */
    +  public String getRegionName()
    +  {
    +    return regionName;
    +  }
    +
    +
    +  /**
    +   * @return the clientCache
    +   */
    +  public ClientCache getClientCache()
    +  {
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @return the locatorPort
    +   */
    +  public int getLocatorPort()
    +  {
    +    return locatorPort;
    +  }
    +
    +  /**
    +   * @param locatorPort
    +   *          the locatorPort to set
    +   */
    +  public void setLocatorPort(int locatorPort)
    +  {
    +    this.locatorPort = locatorPort;
    +  }
    +
    +  /**
    +   * @return the locatorHost
    +   */
    +  public String getLocatorHost()
    +  {
    +    return locatorHost;
    +  }
    +
    +  /**
    +   * @param locatorHost
    +   *          the locatorHost to set
    +   */
    +  public void setLocatorHost(String locatorHost)
    +  {
    +    this.locatorHost = locatorHost;
    +  }
    +
    +  /**
    +   * @return the region
    +   * @throws IOException
    +   */
    +  public Region<Object, Object> getRegion() throws IOException
    +  {
    +    // return region;
    +    if (clientCache == null || clientCache.isClosed()) {
    +      initClient();
    +    }
    +
    +    if (region == null) {
    +      region = clientCache.getRegion(regionName);
    +      if (region == null) {
    +        region = clientCache.<Object, Object> createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
    +      }
    +    }
    +
    +    return region;
    +  }
    +
    +  @Override
    +  public void connect() throws IOException
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (Exception ex) {
    +      throw new RuntimeException(ex);
    +    }
    +
    +    region = clientCache.getRegion(getRegionName());
    +
    +    if (region == null) {
    +      region = clientCache.<Object, Object> createClientRegionFactory(ClientRegionShortcut.PROXY).create(
    +          getRegionName());
    +    }
    +
    +  }
    +
    +  @Override
    +  public void disconnect() throws IOException
    +  {
    +    clientCache.close();
    +
    +  }
    +
    +  @Override
    +  public boolean isConnected()
    +  {
    +    return (clientCache.isClosed());
    +
    +  }
    +
    +  /**
    +   * Gets the value given the key. Note that it does NOT work with hash values
    +   * or list values
    +   *
    +   * @param key
    +   * @return The value.
    +   */
    +  @Override
    +  public Object get(Object key)
    +  {
    +
    +    try {
    +      return (getRegion().get(key));
    +    } catch (IOException ex) {
    +      throw new RuntimeException("Exception while getting the object", ex);      
    +
    +    }
    +
    +  }
    +
    +  /**
    +   * Gets all the values given the keys. Note that it does NOT work with hash
    +   * values or list values
    +   *
    +   * @param keys
    +   * @return All values for the given keys.
    +   */
    +  @SuppressWarnings("unchecked")
    +  @Override
    +  public List<Object> getAll(List<Object> keys)
    +  {
    +
    +    List<Object> values = new ArrayList<Object>();
    +
    +    try {
    +      final Map<Object, Object> entries = getRegion().getAll(keys);
    +      for (int i = 0; i < keys.size(); i++) {
    +        values.add(entries.get(keys.get(i)));
    +      }
    +    } catch (IOException ex) {
    +      logger.info("error getting region ", ex);
    +    }
    +
    +    return (values);
    +
    +  }
    +
    +  /**
    +   * @param regionName
    +   *          the regionName to set
    +   */
    +  public void setRegionName(String regionName)
    +  {
    +    this.regionName = regionName;
    +  }
    +
    +
    +  public Map<Object, Object> getAllMap(List<Object> keys)
    +  {
    +
    +    try {
    +      final Map<Object, Object> entries = getRegion().getAll(keys);
    +      return (entries);
    +    } catch (IOException ex) {
    +      logger.info("error getting object ", ex);
    --- End diff --
    
    why swallowing exception?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Mlhr 1942.geode

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/131#discussion_r48878685
  
    --- Diff: contrib/pom.xml ---
    @@ -623,5 +629,10 @@
           <artifactId>super-csv-joda</artifactId>
           <version>2.3.1</version>
         </dependency>
    +     <dependency>
    +      <groupId>com.gemstone.gemfire</groupId>
    +      <artifactId>gemfire</artifactId>
    +      <version>8.2.0</version>
    --- End diff --
    
    Mark the dependency as optional


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Mlhr 1942.geode

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/131#discussion_r48880930
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java ---
    @@ -0,0 +1,308 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.geode;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.gemstone.gemfire.cache.CacheClosedException;
    +import com.gemstone.gemfire.cache.CacheWriterException;
    +import com.gemstone.gemfire.cache.EntryNotFoundException;
    +import com.gemstone.gemfire.cache.Region;
    +import com.gemstone.gemfire.cache.TimeoutException;
    +import com.gemstone.gemfire.cache.client.ClientCache;
    +import com.gemstone.gemfire.cache.client.ClientCacheFactory;
    +import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
    +import com.gemstone.gemfire.cache.query.FunctionDomainException;
    +import com.gemstone.gemfire.cache.query.NameResolutionException;
    +import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
    +import com.gemstone.gemfire.cache.query.SelectResults;
    +import com.gemstone.gemfire.cache.query.TypeMismatchException;
    +
    +import com.datatorrent.lib.db.KeyValueStore;
    +
    +/**
    + * Provides the implementation of a Geode store.
    + *
    + * 
    + */
    +public class GeodeStore implements KeyValueStore, Serializable
    +{
    +  /**
    +   * 
    +   */
    +  private static final long serialVersionUID = -5076452548893319967L;
    +  private static final Logger LOG = LoggerFactory.getLogger(GeodeStore.class);
    +  private transient ClientCache clientCache = null;
    +  private transient Region<Object, Object> region = null;
    +  private String locatorHost;
    +  private int locatorPort;
    +  private String regionName;
    +
    +  private ClientCache initClient()
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (CacheClosedException ex) {
    +      LOG.info("error initiating client ", ex);
    +    }
    +
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @return the regionName
    +   */
    +  public String getRegionName()
    +  {
    +    return regionName;
    +  }
    +
    +  /**
    +   * @param regionName
    +   *          the regionName to set
    +   */
    +  public void setRegionName(String regionName)
    +  {
    +    this.regionName = regionName;
    +  }
    +
    +  /**
    +   * @return the clientCache
    +   */
    +  public ClientCache getClientCache()
    +  {
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @param clientCache
    +   *          the clientCache to set
    +   */
    +  public void setClientCache(ClientCache clientCache)
    +  {
    +    this.clientCache = clientCache;
    +  }
    +
    +  /**
    +   * @return the locatorPort
    +   */
    +  public int getLocatorPort()
    +  {
    +    return locatorPort;
    +  }
    +
    +  /**
    +   * @param locatorPort
    +   *          the locatorPort to set
    +   */
    +  public void setLocatorPort(int locatorPort)
    +  {
    +    this.locatorPort = locatorPort;
    +  }
    +
    +  /**
    +   * @return the locatorHost
    +   */
    +  public String getLocatorHost()
    +  {
    +    return locatorHost;
    +  }
    +
    +  /**
    +   * @param locatorHost
    +   *          the locatorHost to set
    +   */
    +  public void setLocatorHost(String locatorHost)
    +  {
    +    this.locatorHost = locatorHost;
    +  }
    +
    +  /**
    +   * @return the region
    +   * @throws IOException
    +   */
    +  public Region<Object, Object> getRegion() throws IOException
    +  {
    +    // return region;
    +    if (clientCache == null || clientCache.isClosed()) {
    +      initClient();
    +    }
    +
    +    if (region == null) {
    +      region = clientCache.getRegion(regionName);
    +      if (region == null) {
    +        region = clientCache.<Object, Object> createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
    +      }
    +    }
    +
    +    return region;
    +  }
    +
    +  /**
    +   * @param region
    +   *          the region to set
    +   */
    +  public void setRegion(Region<Object, Object> region) throws IOException
    +  {
    +    this.region = region;
    +  }
    +
    +  @Override
    +  public void connect() throws IOException
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (Exception ex) {
    +      throw new RuntimeException(ex);
    +    }
    +
    +    region = clientCache.getRegion(getRegionName());
    --- End diff --
    
    why getter for RegionName and not direct use of variable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Mlhr 1942.geode

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/131#discussion_r48881384
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java ---
    @@ -0,0 +1,308 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.geode;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.gemstone.gemfire.cache.CacheClosedException;
    +import com.gemstone.gemfire.cache.CacheWriterException;
    +import com.gemstone.gemfire.cache.EntryNotFoundException;
    +import com.gemstone.gemfire.cache.Region;
    +import com.gemstone.gemfire.cache.TimeoutException;
    +import com.gemstone.gemfire.cache.client.ClientCache;
    +import com.gemstone.gemfire.cache.client.ClientCacheFactory;
    +import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
    +import com.gemstone.gemfire.cache.query.FunctionDomainException;
    +import com.gemstone.gemfire.cache.query.NameResolutionException;
    +import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
    +import com.gemstone.gemfire.cache.query.SelectResults;
    +import com.gemstone.gemfire.cache.query.TypeMismatchException;
    +
    +import com.datatorrent.lib.db.KeyValueStore;
    +
    +/**
    + * Provides the implementation of a Geode store.
    + *
    + * 
    + */
    +public class GeodeStore implements KeyValueStore, Serializable
    +{
    +  /**
    +   * 
    +   */
    +  private static final long serialVersionUID = -5076452548893319967L;
    +  private static final Logger LOG = LoggerFactory.getLogger(GeodeStore.class);
    +  private transient ClientCache clientCache = null;
    +  private transient Region<Object, Object> region = null;
    +  private String locatorHost;
    +  private int locatorPort;
    +  private String regionName;
    +
    +  private ClientCache initClient()
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (CacheClosedException ex) {
    +      LOG.info("error initiating client ", ex);
    +    }
    +
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @return the regionName
    +   */
    +  public String getRegionName()
    +  {
    +    return regionName;
    +  }
    +
    +  /**
    +   * @param regionName
    +   *          the regionName to set
    +   */
    +  public void setRegionName(String regionName)
    +  {
    +    this.regionName = regionName;
    +  }
    +
    +  /**
    +   * @return the clientCache
    +   */
    +  public ClientCache getClientCache()
    +  {
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @param clientCache
    +   *          the clientCache to set
    +   */
    +  public void setClientCache(ClientCache clientCache)
    +  {
    +    this.clientCache = clientCache;
    +  }
    +
    +  /**
    +   * @return the locatorPort
    +   */
    +  public int getLocatorPort()
    +  {
    +    return locatorPort;
    +  }
    +
    +  /**
    +   * @param locatorPort
    +   *          the locatorPort to set
    +   */
    +  public void setLocatorPort(int locatorPort)
    +  {
    +    this.locatorPort = locatorPort;
    +  }
    +
    +  /**
    +   * @return the locatorHost
    +   */
    +  public String getLocatorHost()
    +  {
    +    return locatorHost;
    +  }
    +
    +  /**
    +   * @param locatorHost
    +   *          the locatorHost to set
    +   */
    +  public void setLocatorHost(String locatorHost)
    +  {
    +    this.locatorHost = locatorHost;
    +  }
    +
    +  /**
    +   * @return the region
    +   * @throws IOException
    +   */
    +  public Region<Object, Object> getRegion() throws IOException
    +  {
    +    // return region;
    +    if (clientCache == null || clientCache.isClosed()) {
    +      initClient();
    +    }
    +
    +    if (region == null) {
    +      region = clientCache.getRegion(regionName);
    +      if (region == null) {
    +        region = clientCache.<Object, Object> createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
    +      }
    +    }
    +
    +    return region;
    +  }
    +
    +  /**
    +   * @param region
    +   *          the region to set
    +   */
    +  public void setRegion(Region<Object, Object> region) throws IOException
    +  {
    +    this.region = region;
    +  }
    +
    +  @Override
    +  public void connect() throws IOException
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (Exception ex) {
    +      throw new RuntimeException(ex);
    +    }
    +
    +    region = clientCache.getRegion(getRegionName());
    +
    +    if (region == null) {
    +      region = clientCache.<Object, Object> createClientRegionFactory(ClientRegionShortcut.PROXY).create(
    +          getRegionName());
    +    }
    +
    +  }
    +
    +  @Override
    +  public void disconnect() throws IOException
    +  {
    +    clientCache.close();
    +
    +  }
    +
    +  @Override
    +  public boolean isConnected()
    +  {
    +    return (clientCache.isClosed());
    +
    +  }
    +
    +  /**
    +   * Gets the value given the key. Note that it does NOT work with hash values
    +   * or list values
    +   *
    +   * @param key
    +   * @return The value.
    +   */
    +  @Override
    +  public Object get(Object key)
    +  {
    +
    +    try {
    +      return (getRegion().get(key));
    +    } catch (IOException ex) {
    +      LOG.info("error getting object ", ex);
    +      return null;
    +    }
    +
    +  }
    +
    +  /**
    +   * Gets all the values given the keys. Note that it does NOT work with hash
    +   * values or list values
    +   *
    +   * @param keys
    +   * @return All values for the given keys.
    +   */
    +  @SuppressWarnings("unchecked")
    +  @Override
    +  public List<Object> getAll(List<Object> keys)
    +  {
    +
    +    List<Object> values = new ArrayList<Object>();
    +
    +    try {
    +      final Map<Object, Object> entries = getRegion().getAll(keys);
    +      for (int i = 0; i < keys.size(); i++) {
    +        values.add(entries.get(keys.get(i)));
    +      }
    +    } catch (IOException ex) {
    +      LOG.info("error getting region ", ex);
    +    }
    +
    +    return (values);
    +
    +  }
    +
    +  public Map<Object, Object> getallMap(List<Object> keys)
    --- End diff --
    
    getallMap => getAllMap


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Mlhr 1942.geode

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/131#discussion_r48878896
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java ---
    @@ -0,0 +1,308 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.geode;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.gemstone.gemfire.cache.CacheClosedException;
    +import com.gemstone.gemfire.cache.CacheWriterException;
    +import com.gemstone.gemfire.cache.EntryNotFoundException;
    +import com.gemstone.gemfire.cache.Region;
    +import com.gemstone.gemfire.cache.TimeoutException;
    +import com.gemstone.gemfire.cache.client.ClientCache;
    +import com.gemstone.gemfire.cache.client.ClientCacheFactory;
    +import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
    +import com.gemstone.gemfire.cache.query.FunctionDomainException;
    +import com.gemstone.gemfire.cache.query.NameResolutionException;
    +import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
    +import com.gemstone.gemfire.cache.query.SelectResults;
    +import com.gemstone.gemfire.cache.query.TypeMismatchException;
    +
    +import com.datatorrent.lib.db.KeyValueStore;
    +
    +/**
    + * Provides the implementation of a Geode store.
    + *
    + * 
    + */
    +public class GeodeStore implements KeyValueStore, Serializable
    +{
    +  /**
    +   * 
    +   */
    +  private static final long serialVersionUID = -5076452548893319967L;
    +  private static final Logger LOG = LoggerFactory.getLogger(GeodeStore.class);
    --- End diff --
    
    can variable be renamed as `logger`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Mlhr 1942.geode

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/131#discussion_r52192282
  
    --- Diff: contrib/pom.xml ---
    @@ -53,6 +53,18 @@
             <updatePolicy>daily</updatePolicy>
           </releases>
         </repository>
    +    <repository>
    --- End diff --
    
    This should not be required, wasn't there a Geode release a few days ago?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Mlhr 1942.geode

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/131#discussion_r48881482
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java ---
    @@ -0,0 +1,308 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.geode;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.gemstone.gemfire.cache.CacheClosedException;
    +import com.gemstone.gemfire.cache.CacheWriterException;
    +import com.gemstone.gemfire.cache.EntryNotFoundException;
    +import com.gemstone.gemfire.cache.Region;
    +import com.gemstone.gemfire.cache.TimeoutException;
    +import com.gemstone.gemfire.cache.client.ClientCache;
    +import com.gemstone.gemfire.cache.client.ClientCacheFactory;
    +import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
    +import com.gemstone.gemfire.cache.query.FunctionDomainException;
    +import com.gemstone.gemfire.cache.query.NameResolutionException;
    +import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
    +import com.gemstone.gemfire.cache.query.SelectResults;
    +import com.gemstone.gemfire.cache.query.TypeMismatchException;
    +
    +import com.datatorrent.lib.db.KeyValueStore;
    +
    +/**
    + * Provides the implementation of a Geode store.
    + *
    + * 
    + */
    +public class GeodeStore implements KeyValueStore, Serializable
    +{
    +  /**
    +   * 
    +   */
    +  private static final long serialVersionUID = -5076452548893319967L;
    +  private static final Logger LOG = LoggerFactory.getLogger(GeodeStore.class);
    +  private transient ClientCache clientCache = null;
    +  private transient Region<Object, Object> region = null;
    +  private String locatorHost;
    +  private int locatorPort;
    +  private String regionName;
    +
    +  private ClientCache initClient()
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (CacheClosedException ex) {
    +      LOG.info("error initiating client ", ex);
    +    }
    +
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @return the regionName
    +   */
    +  public String getRegionName()
    +  {
    +    return regionName;
    +  }
    +
    +  /**
    +   * @param regionName
    +   *          the regionName to set
    +   */
    +  public void setRegionName(String regionName)
    +  {
    +    this.regionName = regionName;
    +  }
    +
    +  /**
    +   * @return the clientCache
    +   */
    +  public ClientCache getClientCache()
    +  {
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @param clientCache
    +   *          the clientCache to set
    +   */
    +  public void setClientCache(ClientCache clientCache)
    +  {
    +    this.clientCache = clientCache;
    +  }
    +
    +  /**
    +   * @return the locatorPort
    +   */
    +  public int getLocatorPort()
    +  {
    +    return locatorPort;
    +  }
    +
    +  /**
    +   * @param locatorPort
    +   *          the locatorPort to set
    +   */
    +  public void setLocatorPort(int locatorPort)
    +  {
    +    this.locatorPort = locatorPort;
    +  }
    +
    +  /**
    +   * @return the locatorHost
    +   */
    +  public String getLocatorHost()
    +  {
    +    return locatorHost;
    +  }
    +
    +  /**
    +   * @param locatorHost
    +   *          the locatorHost to set
    +   */
    +  public void setLocatorHost(String locatorHost)
    +  {
    +    this.locatorHost = locatorHost;
    +  }
    +
    +  /**
    +   * @return the region
    +   * @throws IOException
    +   */
    +  public Region<Object, Object> getRegion() throws IOException
    +  {
    +    // return region;
    +    if (clientCache == null || clientCache.isClosed()) {
    +      initClient();
    +    }
    +
    +    if (region == null) {
    +      region = clientCache.getRegion(regionName);
    +      if (region == null) {
    +        region = clientCache.<Object, Object> createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
    +      }
    +    }
    +
    +    return region;
    +  }
    +
    +  /**
    +   * @param region
    +   *          the region to set
    +   */
    +  public void setRegion(Region<Object, Object> region) throws IOException
    +  {
    +    this.region = region;
    +  }
    +
    +  @Override
    +  public void connect() throws IOException
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (Exception ex) {
    +      throw new RuntimeException(ex);
    +    }
    +
    +    region = clientCache.getRegion(getRegionName());
    +
    +    if (region == null) {
    +      region = clientCache.<Object, Object> createClientRegionFactory(ClientRegionShortcut.PROXY).create(
    +          getRegionName());
    +    }
    +
    +  }
    +
    +  @Override
    +  public void disconnect() throws IOException
    +  {
    +    clientCache.close();
    +
    +  }
    +
    +  @Override
    +  public boolean isConnected()
    +  {
    +    return (clientCache.isClosed());
    +
    +  }
    +
    +  /**
    +   * Gets the value given the key. Note that it does NOT work with hash values
    +   * or list values
    +   *
    +   * @param key
    +   * @return The value.
    +   */
    +  @Override
    +  public Object get(Object key)
    +  {
    +
    +    try {
    +      return (getRegion().get(key));
    +    } catch (IOException ex) {
    +      LOG.info("error getting object ", ex);
    +      return null;
    +    }
    +
    +  }
    +
    +  /**
    +   * Gets all the values given the keys. Note that it does NOT work with hash
    +   * values or list values
    +   *
    +   * @param keys
    +   * @return All values for the given keys.
    +   */
    +  @SuppressWarnings("unchecked")
    +  @Override
    +  public List<Object> getAll(List<Object> keys)
    +  {
    +
    +    List<Object> values = new ArrayList<Object>();
    +
    +    try {
    +      final Map<Object, Object> entries = getRegion().getAll(keys);
    +      for (int i = 0; i < keys.size(); i++) {
    +        values.add(entries.get(keys.get(i)));
    +      }
    +    } catch (IOException ex) {
    +      LOG.info("error getting region ", ex);
    +    }
    +
    +    return (values);
    +
    +  }
    +
    +  public Map<Object, Object> getallMap(List<Object> keys)
    +  {
    +
    +    try {
    +      final Map<Object, Object> entries = getRegion().getAll(keys);
    +      return (entries);
    +    } catch (IOException ex) {
    +      LOG.info("error getting object ", ex);
    +      return null;
    +    }
    +
    +  }
    +
    +  @SuppressWarnings("rawtypes")
    +  public SelectResults query(String predicate)
    +  {
    +    try {
    +      return (getRegion().query(predicate));
    +    } catch (FunctionDomainException | TypeMismatchException | NameResolutionException | QueryInvocationTargetException
    +        | IOException e) {
    +      e.printStackTrace();
    --- End diff --
    
    e.printStackTrace??


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Mlhr 1942.geode

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/131#discussion_r48879266
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java ---
    @@ -0,0 +1,308 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.geode;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.gemstone.gemfire.cache.CacheClosedException;
    +import com.gemstone.gemfire.cache.CacheWriterException;
    +import com.gemstone.gemfire.cache.EntryNotFoundException;
    +import com.gemstone.gemfire.cache.Region;
    +import com.gemstone.gemfire.cache.TimeoutException;
    +import com.gemstone.gemfire.cache.client.ClientCache;
    +import com.gemstone.gemfire.cache.client.ClientCacheFactory;
    +import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
    +import com.gemstone.gemfire.cache.query.FunctionDomainException;
    +import com.gemstone.gemfire.cache.query.NameResolutionException;
    +import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
    +import com.gemstone.gemfire.cache.query.SelectResults;
    +import com.gemstone.gemfire.cache.query.TypeMismatchException;
    +
    +import com.datatorrent.lib.db.KeyValueStore;
    +
    +/**
    + * Provides the implementation of a Geode store.
    + *
    + * 
    + */
    +public class GeodeStore implements KeyValueStore, Serializable
    +{
    +  /**
    +   * 
    +   */
    +  private static final long serialVersionUID = -5076452548893319967L;
    +  private static final Logger LOG = LoggerFactory.getLogger(GeodeStore.class);
    +  private transient ClientCache clientCache = null;
    +  private transient Region<Object, Object> region = null;
    +  private String locatorHost;
    +  private int locatorPort;
    +  private String regionName;
    +
    +  private ClientCache initClient()
    --- End diff --
    
    I see as getting used only at one place `getRegion` and it has only one statement. Why another function?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Mlhr 1942.geode

Posted by prasi-in <gi...@git.apache.org>.
Github user prasi-in commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/131#discussion_r48597503
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java ---
    @@ -0,0 +1,308 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.geode;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.gemstone.gemfire.cache.CacheClosedException;
    +import com.gemstone.gemfire.cache.CacheWriterException;
    +import com.gemstone.gemfire.cache.EntryNotFoundException;
    +import com.gemstone.gemfire.cache.Region;
    +import com.gemstone.gemfire.cache.TimeoutException;
    +import com.gemstone.gemfire.cache.client.ClientCache;
    +import com.gemstone.gemfire.cache.client.ClientCacheFactory;
    +import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
    +import com.gemstone.gemfire.cache.query.FunctionDomainException;
    +import com.gemstone.gemfire.cache.query.NameResolutionException;
    +import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
    +import com.gemstone.gemfire.cache.query.SelectResults;
    +import com.gemstone.gemfire.cache.query.TypeMismatchException;
    +
    +import com.datatorrent.lib.db.KeyValueStore;
    +
    +/**
    + * Provides the implementation of a Geode store.
    + *
    + * 
    + */
    +public class GeodeStore implements KeyValueStore, Serializable
    +{
    +  /**
    +   * 
    +   */
    +  private static final long serialVersionUID = -5076452548893319967L;
    +  private static final Logger LOG = LoggerFactory.getLogger(GeodeStore.class);
    +  private transient ClientCache clientCache = null;
    +  private transient Region<Object, Object> region = null;
    +  private String locatorHost;
    +  private int locatorPort;
    +  private String regionName;
    +
    +  private ClientCache initClient()
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (CacheClosedException ex) {
    +      LOG.info("error initiating client ", ex);
    +    }
    +
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @return the regionName
    +   */
    +  public String getRegionName()
    +  {
    +    return regionName;
    +  }
    +
    +  /**
    +   * @param regionName
    +   *          the regionName to set
    +   */
    +  public void setRegionName(String regionName)
    +  {
    +    this.regionName = regionName;
    +  }
    +
    +  /**
    +   * @return the clientCache
    +   */
    +  public ClientCache getClientCache()
    +  {
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @param clientCache
    +   *          the clientCache to set
    +   */
    +  public void setClientCache(ClientCache clientCache)
    +  {
    +    this.clientCache = clientCache;
    +  }
    +
    +  /**
    +   * @return the locatorPort
    +   */
    +  public int getLocatorPort()
    +  {
    +    return locatorPort;
    +  }
    +
    +  /**
    +   * @param locatorPort
    +   *          the locatorPort to set
    +   */
    +  public void setLocatorPort(int locatorPort)
    +  {
    +    this.locatorPort = locatorPort;
    +  }
    +
    +  /**
    +   * @return the locatorHost
    +   */
    +  public String getLocatorHost()
    +  {
    +    return locatorHost;
    +  }
    +
    +  /**
    +   * @param locatorHost
    +   *          the locatorHost to set
    +   */
    +  public void setLocatorHost(String locatorHost)
    +  {
    +    this.locatorHost = locatorHost;
    +  }
    +
    +  /**
    +   * @return the region
    +   * @throws IOException
    +   */
    +  public Region<Object, Object> getRegion() throws IOException
    +  {
    +    // return region;
    +    if (clientCache == null && clientCache.isClosed()) {
    +      initClient();
    +    }
    +
    +    if (region == null) {
    +      region = clientCache.getRegion(regionName);
    +      if (region == null) {
    +        region = clientCache.<Object, Object> createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
    +      }
    +    }
    +
    +    return region;
    +  }
    +
    +  /**
    +   * @param region
    +   *          the region to set
    +   */
    +  public void setRegion(Region<Object, Object> region) throws IOException
    +  {
    +    this.region = region;
    +  }
    +
    +  @Override
    +  public void connect() throws IOException
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (CacheClosedException ex) {
    +      LOG.info("error initiating client ", ex);
    +    }
    +
    +    region = clientCache.getRegion(getRegionName());
    +
    +    if (region == null) {
    +      region = clientCache.<Object, Object> createClientRegionFactory(ClientRegionShortcut.PROXY).create(
    +          getRegionName());
    +    }
    +
    +  }
    +
    +  @Override
    +  public void disconnect() throws IOException
    +  {
    +    clientCache.close();
    +
    +  }
    +
    +  @Override
    +  public boolean isConnected()
    +  {
    +    return (clientCache.isClosed());
    +
    +  }
    +
    +  /**
    +   * Gets the value given the key. Note that it does NOT work with hash values
    +   * or list values
    +   *
    +   * @param key
    +   * @return The value.
    +   */
    +  @Override
    +  public Object get(Object key)
    +  {
    +
    +    try {
    +      return (getRegion().get(key));
    +    } catch (IOException ex) {
    +      LOG.info("error getting object ", ex);
    +      return null;
    +    }
    +
    +  }
    +
    +  /**
    +   * Gets all the values given the keys. Note that it does NOT work with hash
    +   * values or list values
    +   *
    +   * @param keys
    +   * @return All values for the given keys.
    +   */
    +  @SuppressWarnings("unchecked")
    +  @Override
    +  public List<Object> getAll(List<Object> keys)
    +  {
    +
    +    List<Object> values = new ArrayList<Object>();
    +
    +    try {
    +      final Map<Object, Object> entries = getRegion().getAll(keys);
    +      for (int i = 0; i < keys.size(); i++) {
    --- End diff --
    
    The method expects the values to be returned in the same order as the keys are passed. However apache geode returns a map of key values. Hence to maintain the semantics have iterated the entries and returned just the values in the order of the keys passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Mlhr 1942.geode

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/131#discussion_r48880946
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java ---
    @@ -0,0 +1,308 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.geode;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.gemstone.gemfire.cache.CacheClosedException;
    +import com.gemstone.gemfire.cache.CacheWriterException;
    +import com.gemstone.gemfire.cache.EntryNotFoundException;
    +import com.gemstone.gemfire.cache.Region;
    +import com.gemstone.gemfire.cache.TimeoutException;
    +import com.gemstone.gemfire.cache.client.ClientCache;
    +import com.gemstone.gemfire.cache.client.ClientCacheFactory;
    +import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
    +import com.gemstone.gemfire.cache.query.FunctionDomainException;
    +import com.gemstone.gemfire.cache.query.NameResolutionException;
    +import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
    +import com.gemstone.gemfire.cache.query.SelectResults;
    +import com.gemstone.gemfire.cache.query.TypeMismatchException;
    +
    +import com.datatorrent.lib.db.KeyValueStore;
    +
    +/**
    + * Provides the implementation of a Geode store.
    + *
    + * 
    + */
    +public class GeodeStore implements KeyValueStore, Serializable
    +{
    +  /**
    +   * 
    +   */
    +  private static final long serialVersionUID = -5076452548893319967L;
    +  private static final Logger LOG = LoggerFactory.getLogger(GeodeStore.class);
    +  private transient ClientCache clientCache = null;
    +  private transient Region<Object, Object> region = null;
    +  private String locatorHost;
    +  private int locatorPort;
    +  private String regionName;
    +
    +  private ClientCache initClient()
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (CacheClosedException ex) {
    +      LOG.info("error initiating client ", ex);
    +    }
    +
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @return the regionName
    +   */
    +  public String getRegionName()
    +  {
    +    return regionName;
    +  }
    +
    +  /**
    +   * @param regionName
    +   *          the regionName to set
    +   */
    +  public void setRegionName(String regionName)
    +  {
    +    this.regionName = regionName;
    +  }
    +
    +  /**
    +   * @return the clientCache
    +   */
    +  public ClientCache getClientCache()
    +  {
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @param clientCache
    +   *          the clientCache to set
    +   */
    +  public void setClientCache(ClientCache clientCache)
    +  {
    +    this.clientCache = clientCache;
    +  }
    +
    +  /**
    +   * @return the locatorPort
    +   */
    +  public int getLocatorPort()
    +  {
    +    return locatorPort;
    +  }
    +
    +  /**
    +   * @param locatorPort
    +   *          the locatorPort to set
    +   */
    +  public void setLocatorPort(int locatorPort)
    +  {
    +    this.locatorPort = locatorPort;
    +  }
    +
    +  /**
    +   * @return the locatorHost
    +   */
    +  public String getLocatorHost()
    +  {
    +    return locatorHost;
    +  }
    +
    +  /**
    +   * @param locatorHost
    +   *          the locatorHost to set
    +   */
    +  public void setLocatorHost(String locatorHost)
    +  {
    +    this.locatorHost = locatorHost;
    +  }
    +
    +  /**
    +   * @return the region
    +   * @throws IOException
    +   */
    +  public Region<Object, Object> getRegion() throws IOException
    +  {
    +    // return region;
    +    if (clientCache == null || clientCache.isClosed()) {
    +      initClient();
    +    }
    +
    +    if (region == null) {
    +      region = clientCache.getRegion(regionName);
    +      if (region == null) {
    +        region = clientCache.<Object, Object> createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
    +      }
    +    }
    +
    +    return region;
    +  }
    +
    +  /**
    +   * @param region
    +   *          the region to set
    +   */
    +  public void setRegion(Region<Object, Object> region) throws IOException
    +  {
    +    this.region = region;
    +  }
    +
    +  @Override
    +  public void connect() throws IOException
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (Exception ex) {
    +      throw new RuntimeException(ex);
    +    }
    +
    +    region = clientCache.getRegion(getRegionName());
    +
    +    if (region == null) {
    +      region = clientCache.<Object, Object> createClientRegionFactory(ClientRegionShortcut.PROXY).create(
    +          getRegionName());
    --- End diff --
    
    same as above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Mlhr 1942.geode

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/131#discussion_r48881103
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java ---
    @@ -0,0 +1,308 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.geode;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.gemstone.gemfire.cache.CacheClosedException;
    +import com.gemstone.gemfire.cache.CacheWriterException;
    +import com.gemstone.gemfire.cache.EntryNotFoundException;
    +import com.gemstone.gemfire.cache.Region;
    +import com.gemstone.gemfire.cache.TimeoutException;
    +import com.gemstone.gemfire.cache.client.ClientCache;
    +import com.gemstone.gemfire.cache.client.ClientCacheFactory;
    +import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
    +import com.gemstone.gemfire.cache.query.FunctionDomainException;
    +import com.gemstone.gemfire.cache.query.NameResolutionException;
    +import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
    +import com.gemstone.gemfire.cache.query.SelectResults;
    +import com.gemstone.gemfire.cache.query.TypeMismatchException;
    +
    +import com.datatorrent.lib.db.KeyValueStore;
    +
    +/**
    + * Provides the implementation of a Geode store.
    + *
    + * 
    + */
    +public class GeodeStore implements KeyValueStore, Serializable
    +{
    +  /**
    +   * 
    +   */
    +  private static final long serialVersionUID = -5076452548893319967L;
    +  private static final Logger LOG = LoggerFactory.getLogger(GeodeStore.class);
    +  private transient ClientCache clientCache = null;
    +  private transient Region<Object, Object> region = null;
    +  private String locatorHost;
    +  private int locatorPort;
    +  private String regionName;
    +
    +  private ClientCache initClient()
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (CacheClosedException ex) {
    +      LOG.info("error initiating client ", ex);
    +    }
    +
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @return the regionName
    +   */
    +  public String getRegionName()
    +  {
    +    return regionName;
    +  }
    +
    +  /**
    +   * @param regionName
    +   *          the regionName to set
    +   */
    +  public void setRegionName(String regionName)
    +  {
    +    this.regionName = regionName;
    +  }
    +
    +  /**
    +   * @return the clientCache
    +   */
    +  public ClientCache getClientCache()
    +  {
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @param clientCache
    +   *          the clientCache to set
    +   */
    +  public void setClientCache(ClientCache clientCache)
    +  {
    +    this.clientCache = clientCache;
    +  }
    +
    +  /**
    +   * @return the locatorPort
    +   */
    +  public int getLocatorPort()
    +  {
    +    return locatorPort;
    +  }
    +
    +  /**
    +   * @param locatorPort
    +   *          the locatorPort to set
    +   */
    +  public void setLocatorPort(int locatorPort)
    +  {
    +    this.locatorPort = locatorPort;
    +  }
    +
    +  /**
    +   * @return the locatorHost
    +   */
    +  public String getLocatorHost()
    +  {
    +    return locatorHost;
    +  }
    +
    +  /**
    +   * @param locatorHost
    +   *          the locatorHost to set
    +   */
    +  public void setLocatorHost(String locatorHost)
    +  {
    +    this.locatorHost = locatorHost;
    +  }
    +
    +  /**
    +   * @return the region
    +   * @throws IOException
    +   */
    +  public Region<Object, Object> getRegion() throws IOException
    +  {
    +    // return region;
    +    if (clientCache == null || clientCache.isClosed()) {
    +      initClient();
    +    }
    +
    +    if (region == null) {
    +      region = clientCache.getRegion(regionName);
    +      if (region == null) {
    +        region = clientCache.<Object, Object> createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
    +      }
    +    }
    +
    +    return region;
    +  }
    +
    +  /**
    +   * @param region
    +   *          the region to set
    +   */
    +  public void setRegion(Region<Object, Object> region) throws IOException
    +  {
    +    this.region = region;
    +  }
    +
    +  @Override
    +  public void connect() throws IOException
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (Exception ex) {
    +      throw new RuntimeException(ex);
    +    }
    +
    +    region = clientCache.getRegion(getRegionName());
    +
    +    if (region == null) {
    +      region = clientCache.<Object, Object> createClientRegionFactory(ClientRegionShortcut.PROXY).create(
    +          getRegionName());
    +    }
    +
    +  }
    +
    +  @Override
    +  public void disconnect() throws IOException
    +  {
    +    clientCache.close();
    +
    +  }
    +
    +  @Override
    +  public boolean isConnected()
    +  {
    +    return (clientCache.isClosed());
    +
    +  }
    +
    +  /**
    +   * Gets the value given the key. Note that it does NOT work with hash values
    +   * or list values
    +   *
    +   * @param key
    +   * @return The value.
    +   */
    +  @Override
    +  public Object get(Object key)
    +  {
    +
    +    try {
    +      return (getRegion().get(key));
    +    } catch (IOException ex) {
    +      LOG.info("error getting object ", ex);
    --- End diff --
    
    Why catch exception and not throw it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Mlhr 1942.geode

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/131#discussion_r48474276
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java ---
    @@ -0,0 +1,308 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.geode;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.gemstone.gemfire.cache.CacheClosedException;
    +import com.gemstone.gemfire.cache.CacheWriterException;
    +import com.gemstone.gemfire.cache.EntryNotFoundException;
    +import com.gemstone.gemfire.cache.Region;
    +import com.gemstone.gemfire.cache.TimeoutException;
    +import com.gemstone.gemfire.cache.client.ClientCache;
    +import com.gemstone.gemfire.cache.client.ClientCacheFactory;
    +import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
    +import com.gemstone.gemfire.cache.query.FunctionDomainException;
    +import com.gemstone.gemfire.cache.query.NameResolutionException;
    +import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
    +import com.gemstone.gemfire.cache.query.SelectResults;
    +import com.gemstone.gemfire.cache.query.TypeMismatchException;
    +
    +import com.datatorrent.lib.db.KeyValueStore;
    +
    +/**
    + * Provides the implementation of a Geode store.
    + *
    + * 
    + */
    +public class GeodeStore implements KeyValueStore, Serializable
    +{
    +  /**
    +   * 
    +   */
    +  private static final long serialVersionUID = -5076452548893319967L;
    +  private static final Logger LOG = LoggerFactory.getLogger(GeodeStore.class);
    +  private transient ClientCache clientCache = null;
    +  private transient Region<Object, Object> region = null;
    +  private String locatorHost;
    +  private int locatorPort;
    +  private String regionName;
    +
    +  private ClientCache initClient()
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (CacheClosedException ex) {
    +      LOG.info("error initiating client ", ex);
    +    }
    +
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @return the regionName
    +   */
    +  public String getRegionName()
    +  {
    +    return regionName;
    +  }
    +
    +  /**
    +   * @param regionName
    +   *          the regionName to set
    +   */
    +  public void setRegionName(String regionName)
    +  {
    +    this.regionName = regionName;
    +  }
    +
    +  /**
    +   * @return the clientCache
    +   */
    +  public ClientCache getClientCache()
    +  {
    +    return clientCache;
    +  }
    +
    +  /**
    +   * @param clientCache
    +   *          the clientCache to set
    +   */
    +  public void setClientCache(ClientCache clientCache)
    +  {
    +    this.clientCache = clientCache;
    +  }
    +
    +  /**
    +   * @return the locatorPort
    +   */
    +  public int getLocatorPort()
    +  {
    +    return locatorPort;
    +  }
    +
    +  /**
    +   * @param locatorPort
    +   *          the locatorPort to set
    +   */
    +  public void setLocatorPort(int locatorPort)
    +  {
    +    this.locatorPort = locatorPort;
    +  }
    +
    +  /**
    +   * @return the locatorHost
    +   */
    +  public String getLocatorHost()
    +  {
    +    return locatorHost;
    +  }
    +
    +  /**
    +   * @param locatorHost
    +   *          the locatorHost to set
    +   */
    +  public void setLocatorHost(String locatorHost)
    +  {
    +    this.locatorHost = locatorHost;
    +  }
    +
    +  /**
    +   * @return the region
    +   * @throws IOException
    +   */
    +  public Region<Object, Object> getRegion() throws IOException
    +  {
    +    // return region;
    +    if (clientCache == null && clientCache.isClosed()) {
    +      initClient();
    +    }
    +
    +    if (region == null) {
    +      region = clientCache.getRegion(regionName);
    +      if (region == null) {
    +        region = clientCache.<Object, Object> createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
    +      }
    +    }
    +
    +    return region;
    +  }
    +
    +  /**
    +   * @param region
    +   *          the region to set
    +   */
    +  public void setRegion(Region<Object, Object> region) throws IOException
    +  {
    +    this.region = region;
    +  }
    +
    +  @Override
    +  public void connect() throws IOException
    +  {
    +    try {
    +      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
    +    } catch (CacheClosedException ex) {
    +      LOG.info("error initiating client ", ex);
    +    }
    +
    +    region = clientCache.getRegion(getRegionName());
    +
    --- End diff --
    
    This can throw NPE if CacheClosedException is thrown during initialization of clientCache


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---