You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by ashishtadose <gi...@git.apache.org> on 2015/12/09 16:22:49 UTC

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

GitHub user ashishtadose opened a pull request:

    https://github.com/apache/incubator-apex-core/pull/184

    APEXCORE-283 - checkpointing in distributed in-memory store

    @tushargosavi
    
    Can you please review the changes.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ashishtadose/incubator-apex-core APEXCORE-283.checkpoint

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-apex-core/pull/184.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #184
    
----
commit 1988d460e921dc82249d60c0bbe5a1f1b5913910
Author: Ashish <as...@ampool.io>
Date:   2015-12-09T14:45:27Z

    APEXCORE-283 #comment Interface for storage agent to retrieve application id

commit f23c6814bf2964dc46d50f60ad93e4faa94b3880
Author: Ashish <as...@ampool.io>
Date:   2015-12-09T14:48:44Z

    APEXCORE-283 #comment Abstract implementation of KeyValue storage agent which can be configured with implementation of KeyValue store for checkpointing.

commit f092f046810acfaeb00955ffcde6a3e6c27cff4d
Author: Ashish <as...@ampool.io>
Date:   2015-12-09T14:53:02Z

    APEXCORE-283 #comment Concrete implementation of In memory storage agent for Apache Geode

commit ab110cb0e77bfd50991fd16abc01fada7201e781
Author: Ashish <as...@ampool.io>
Date:   2015-12-09T15:11:00Z

    APEXCORE-283 #comment Updated AppIdAwareStorage agent changes in StramLocalCluster

commit aae555660022051a367f7be822833098f8dcc8fb
Author: Ashish <as...@ampool.io>
Date:   2015-12-09T15:14:10Z

    APEXCORE-283 #comment Excluded Geode storage related test cases as it requires some manual setup

commit 142f0212eccda92dca90569f79d9c8a5a9874439
Author: Ashish <as...@ampool.io>
Date:   2015-12-09T15:15:42Z

    APEXCORE-283 #comment fixed checkstyle fixes causing build failures

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by tushargosavi <gi...@git.apache.org>.
Github user tushargosavi commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/184#discussion_r47125390
  
    --- Diff: common/src/main/java/com/datatorrent/common/util/GeodeStore.java ---
    @@ -0,0 +1,324 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.common.util;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.esotericsoftware.kryo.Kryo;
    +import com.esotericsoftware.kryo.io.Input;
    +import com.esotericsoftware.kryo.io.Output;
    +import com.gemstone.gemfire.cache.Region;
    +import com.gemstone.gemfire.cache.client.ClientCache;
    +import com.gemstone.gemfire.cache.client.ClientCacheFactory;
    +import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
    +import com.gemstone.gemfire.cache.execute.Execution;
    +import com.gemstone.gemfire.cache.execute.FunctionService;
    +import com.gemstone.gemfire.cache.query.Query;
    +import com.gemstone.gemfire.cache.query.QueryService;
    +import com.gemstone.gemfire.cache.query.SelectResults;
    +import com.google.common.collect.Maps;
    +
    +import com.datatorrent.api.KeyValueStore;
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +/**
    + * Geode Store implementation of {@link KeyValueStore} Uses {@link Kryo}
    + * serialization to store retrieve objects
    + * 
    + * @since 3.2.0
    + *
    + */
    +public class GeodeStore implements KeyValueStore, Serializable
    +{
    +
    +  public static final String GET_KEYS_QUERY = "SELECT entry.key FROM /$[region}.entries entry WHERE entry.key LIKE '${operator.id}%'";
    +
    +  private String geodeLocators;
    +  private String geodeRegionName;
    +
    +  public String getGeodeRegionName()
    +  {
    +    return geodeRegionName;
    +  }
    +
    +  public void setGeodeRegionName(String geodeRegionName)
    +  {
    +    this.geodeRegionName = geodeRegionName;
    +  }
    +
    +  protected transient Kryo kryo;
    +
    +  public GeodeStore()
    +  {
    +    geodeLocators = null;
    +    kryo = null;
    +  }
    +
    +  /**
    +   * Initializes Geode store by using locator connection string
    +   * 
    +   * @param locatorString
    +   */
    +  public GeodeStore(String locatorString)
    +  {
    +    this.geodeLocators = locatorString;
    +    kryo = new Kryo();
    +  }
    +
    +  private Kryo getKyro()
    +  {
    +    if (kryo == null) {
    +      kryo = new Kryo();
    +    }
    +    return kryo;
    +  }
    +
    +  /**
    +   * Get the Geode locator connection string
    +   * 
    +   * @return locator connection string
    +   */
    +  public String getGeodeLocators()
    +  {
    +    return geodeLocators;
    +  }
    +
    +  /**
    +   * Sets the Geode locator string
    +   * 
    +   * @param geodeLocators
    +   */
    +  public void setGeodeLocators(String geodeLocators)
    +  {
    +    this.geodeLocators = geodeLocators;
    +  }
    +
    +  private transient ClientCache clientCache = null;
    +  private transient Region<String, byte[]> region = null;
    +
    +  /**
    +   * Connect the Geode store by initializing Geode Client Cache
    +   */
    +  @Override
    +  public void connect() throws IOException
    +  {
    +    ClientCacheFactory factory = new ClientCacheFactory();
    +    Map<String, String> locators = parseLocatorString(geodeLocators);
    +
    +    if (locators.size() == 0) {
    +      throw new IllegalArgumentException("Invalid locator connection string " + geodeLocators);
    +    } else {
    +      for (Entry<String, String> entry : locators.entrySet()) {
    +        factory.addPoolLocator(entry.getKey(), Integer.valueOf(entry.getValue()));
    +      }
    +    }
    +    clientCache = factory.create();
    +  }
    +
    +  private Region<String, byte[]> getGeodeRegion() throws IOException
    +  {
    +    if (clientCache == null) {
    +      this.connect();
    +    }
    +    if (region == null) {
    +      region = clientCache.getRegion(geodeRegionName);
    +      if (region == null) {
    +        createRegion();
    +        region = clientCache.<String, byte[]> createClientRegionFactory(ClientRegionShortcut.PROXY)
    +            .create(geodeRegionName);
    +      }
    +    }
    +
    +    return region;
    +  }
    +
    +  /**
    +   * Creates a region
    +   * 
    +   */
    +  public synchronized void createRegion()
    +  {
    +    RegionCreateFunction atcf = new RegionCreateFunction();
    +    java.util.List<Object> inputList = new java.util.ArrayList<Object>();
    +
    +    inputList.add(geodeRegionName);
    +    inputList.add(true);
    +
    +    Execution members = FunctionService.onServers(clientCache.getDefaultPool()).withArgs(inputList);
    +    members.execute(atcf.getId()).getResult();
    +  }
    +
    +  /**
    +   * Disconnect the connection to Geode store by closing Client Cache connection
    +   */
    +  @Override
    +  public void disconnect() throws IOException
    +  {
    +    clientCache.close();
    +  }
    +
    +  /**
    +   * Check if store is connected to configured Geode cluster or not
    +   * 
    +   * @return True is connected to Geode cluster and client cache is active
    +   */
    +  @Override
    +  public boolean isConnected()
    +  {
    +    if (clientCache == null) {
    +      return false;
    +    }
    +    return !clientCache.isClosed();
    +  }
    +
    +  /**
    +   * Return the value for specified key from Geode region
    +   * 
    +   * @return the value object
    +   */
    +  @Override
    +  public Object get(Object key)
    +  {
    +
    +    try {
    +      byte[] obj = getGeodeRegion().get((String)key);
    +      if (obj == null) {
    +        return null;
    +      }
    +
    +      ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(obj);
    +      getKyro().setClassLoader(Thread.currentThread().getContextClassLoader());
    +      Input input = new Input(byteArrayInputStream);
    +      return getKyro().readClassAndObject(input);
    +
    +    } catch (Throwable t) {
    +      logger.error("while retrieving {} ", key, t);
    +    }
    +    return null;
    +  }
    +
    +  /**
    +   * Put given given key & value in Geode region
    +   */
    +  @Override
    +  public void put(Object key, Object value)
    +  {
    +    try {
    +      Output output = new Output(4096, Integer.MAX_VALUE);
    +      getKyro().writeClassAndObject(output, value);
    +      getGeodeRegion().put((String)key, output.getBuffer());
    +    } catch (Throwable t) {
    +      logger.error("while storing {} ", key, t);
    +    }
    +  }
    +
    +  /**
    +   * Removed the record associated for specified key from Geode region
    +   */
    +  @Override
    +  public void remove(Object key)
    +  {
    +    try {
    +      getGeodeRegion().destroy((String)key);
    +    } catch (Throwable t) {
    +      logger.info("while deleting {}", key, t);
    +    }
    +
    +  }
    +
    +  /**
    +   * Get list for keys starting from provided key name
    +   * 
    +   * @return List of keys
    +   */
    +  @Override
    +  public List<String> getKeys(Object key)
    +  {
    +    List<String> keys = null;
    +    try {
    +      keys = queryIds((int)(key));
    +      return keys;
    +    } catch (Throwable t) {
    +      logger.info("while deleting {}", key, t);
    +    }
    +    return null;
    +  }
    +
    +  public List<String> queryIds(int operatorId) throws IOException
    +  {
    +    List<String> ids = new ArrayList<>();
    +    try {
    +      QueryService queryService = clientCache.getQueryService();
    +      Query query = queryService.newQuery(
    +          GET_KEYS_QUERY.replace("$[region}", geodeRegionName).replace("${operator.id}", String.valueOf(operatorId)));
    +      logger.debug("executing query {} ", query.getQueryString());
    +//      Object[] params = new Object[1];
    --- End diff --
    
    remove commented out code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/184#issuecomment-163339700
  
    @ashishtadose The only change needed in core is handling the appId support. The rest (geode dependencies) all belong into Malhar.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/184#discussion_r47402814
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java ---
    @@ -302,6 +304,15 @@ public StramLocalCluster(LogicalPlan dag) throws IOException, ClassNotFoundExcep
         if (dag.getAttributes().get(LogicalPlan.APPLICATION_PATH) == null) {
           dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, pathUri);
         }
    +    
    +    StorageAgent agent = dag.getAttributes().get(OperatorContext.STORAGE_AGENT);
    +    if (agent != null && agent instanceof AppIdAwareStorageAgent) {
    +      AppIdAwareStorageAgent ap = (AppIdAwareStorageAgent)dag.getAttributes().get(OperatorContext.STORAGE_AGENT);
    +
    +      ap.saveAppId(dag.getAttributes().get(LogicalPlan.APPLICATION_ID));
    +      dag.setAttribute(OperatorContext.STORAGE_AGENT, ap);
    --- End diff --
    
    This is not needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by ashishtadose <gi...@git.apache.org>.
Github user ashishtadose commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/184#issuecomment-163366276
  
    Hi @tweise  @tushargosavi 
    All review comments have been incorporated.
    Kindly suggest on core movement to Malhar.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by ashishtadose <gi...@git.apache.org>.
Github user ashishtadose commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/184#discussion_r47129946
  
    --- Diff: common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java ---
    @@ -71,12 +71,10 @@ public FSStorageAgent(String path, Configuration conf)
     
    --- End diff --
    
    @tushargosavi  this was causing lot of checkstyle violations and causing the build to fail so fixed it. 
    sure will just revert it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/184#issuecomment-163450785
  
    @ashishtadose All additions except the changes related to AppIdAwareStorageAgent should go into Malhar (contrib project or separate geode project). This will decouple Geode support and Apex core.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/184#discussion_r47217929
  
    --- Diff: api/src/main/java/com/datatorrent/api/KeyValueStore.java ---
    @@ -0,0 +1,88 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.api;
    +
    +import java.io.IOException;
    +import java.util.List;
    +
    +/**
    + * Interface for KeyValue store
    + * 
    + */
    +public interface KeyValueStore
    --- End diff --
    
    I don't think this API should be in Apex. It can go in Malhar


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by ashishtadose <gi...@git.apache.org>.
Github user ashishtadose commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/184#issuecomment-163348597
  
    @tweise agree its better that ways. will keep AppId support & Abstract storage implementation in core and will move rest concrete implementations to Malhar. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/184#discussion_r47592207
  
    --- Diff: api/src/main/java/com/datatorrent/api/ApplicationAwareStorageAgent.java ---
    @@ -0,0 +1,38 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package com.datatorrent.api;
    +
    +import com.datatorrent.api.Attribute.AttributeMap;
    +
    +/**
    + * Interface to pass application attributes to storage agent
    + * 
    + *
    + */
    +public interface ApplicationAwareStorageAgent extends StorageAgent
    --- End diff --
    
    Rename to StorageAgent.ApplicationAttributesAware ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/184#discussion_r47592139
  
    --- Diff: api/src/main/java/com/datatorrent/api/ApplicationAwareStorageAgent.java ---
    @@ -0,0 +1,38 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package com.datatorrent.api;
    +
    +import com.datatorrent.api.Attribute.AttributeMap;
    +
    +/**
    + * Interface to pass application attributes to storage agent
    + * 
    + *
    + */
    +public interface ApplicationAwareStorageAgent extends StorageAgent
    --- End diff --
    
    This interface is specific to StorageAgent and can be made a nested interface (reduces clutter in the api package).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by ashishtadose <gi...@git.apache.org>.
Github user ashishtadose commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/184#issuecomment-165569141
  
    @tweise @gauravgopi123  commits are squashed with appropriate changes.
    Thanks for reviewing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/184#issuecomment-164872146
  
    Please add the test case for new API and also squash the commits


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by tushargosavi <gi...@git.apache.org>.
Github user tushargosavi commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/184#discussion_r47122916
  
    --- Diff: api/src/main/java/com/datatorrent/api/AppIdAwareStorageAgent.java ---
    @@ -0,0 +1,36 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package com.datatorrent.api;
    +
    +/**
    + * Interface to pass application id to storage agent
    + * 
    + * @since 3.2.0
    + *
    + */
    +public interface AppIdAwareStorageAgent extends StorageAgent
    +{
    +  /**
    +   * Save the application id so it can be used by StorageAgent for checkpointing
    +   * 
    +   * @param applicationId
    +   */
    +  public void saveAppId(String applicationId);
    --- End diff --
    
    Instead of just applicationId, can you pass Attributes. The DAG attributes has following members which may be useful for a storage agent.
    - Application Name
    - Application Id
    - Application Path



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by tushargosavi <gi...@git.apache.org>.
Github user tushargosavi commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/184#discussion_r47124403
  
    --- Diff: api/src/main/java/com/datatorrent/api/AppIdAwareStorageAgent.java ---
    @@ -0,0 +1,36 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package com.datatorrent.api;
    +
    +/**
    + * Interface to pass application id to storage agent
    + * 
    + * @since 3.2.0
    + *
    + */
    +public interface AppIdAwareStorageAgent extends StorageAgent
    +{
    +  /**
    +   * Save the application id so it can be used by StorageAgent for checkpointing
    +   * 
    +   * @param applicationId
    +   */
    +  public void saveAppId(String applicationId);
    --- End diff --
    
    This will also take care of https://issues.apache.org/jira/browse/APEXCORE-61


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/184#discussion_r47402852
  
    --- Diff: common/pom.xml ---
    @@ -59,7 +59,7 @@
               <maxAllowedViolations>149</maxAllowedViolations>
             </configuration>
           </plugin>
    -    </plugins>
    --- End diff --
    
    File should not be modified.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/184#discussion_r47955345
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/StramClient.java ---
    @@ -455,6 +476,12 @@ public void startApplication() throws YarnException, IOException
           }
     
           dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, appPath.toString());
    +      StorageAgent agent = dag.getAttributes().get(OperatorContext.STORAGE_AGENT);
    +      if (agent != null && agent instanceof StorageAgent.ApplicationAwareStorageAgent) {
    +        StorageAgent.ApplicationAwareStorageAgent ap = (StorageAgent.ApplicationAwareStorageAgent)dag.getAttributes().get(OperatorContext.STORAGE_AGENT);
    --- End diff --
    
    This call is not needed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/184#discussion_r47218299
  
    --- Diff: common/src/test/java/com/datatorrent/common/util/FSStorageAgentTest.java ---
    @@ -104,10 +104,10 @@ public void testLoad() throws IOException
         testMeta.storageAgent.save(dataOf1, 1, 1);
    --- End diff --
    
    This only has formatting changes. Please revert it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by ashishtadose <gi...@git.apache.org>.
Github user ashishtadose commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/184#issuecomment-166670922
  
    All changes have been incorporated. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-apex-core/pull/184


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by tushargosavi <gi...@git.apache.org>.
Github user tushargosavi commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/184#discussion_r47125023
  
    --- Diff: common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java ---
    @@ -71,12 +71,10 @@ public FSStorageAgent(String path, Configuration conf)
     
    --- End diff --
    
    This file (FSStorageAgent.java)  only contains formatting changes. Can you revert it back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/184#issuecomment-164631996
  
    When all other comments are addressed, please squash the commits. See (9) in
    
    http://apex.incubator.apache.org/contributing.html#opening-pull-requests-contributors- 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/184#discussion_r47218070
  
    --- Diff: common/src/main/java/com/datatorrent/common/util/AbstractKeyValueStorageAgent.java ---
    @@ -0,0 +1,234 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.common.util;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.List;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.datatorrent.api.AppIdAwareStorageAgent;
    +import com.datatorrent.api.KeyValueStore;
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +/**
    + * Abstract implementation of {@link AppIdAwareStorageAgent} which can be
    + * configured be KeyValue store witch implementation of {@link KeyValueStore}
    + * 
    + *
    + * @param <S>
    + *          Store implementation
    + */
    +public abstract class AbstractKeyValueStorageAgent<S extends KeyValueStore>
    --- End diff --
    
    Again this should not be in Apex


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by tushargosavi <gi...@git.apache.org>.
Github user tushargosavi commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/184#discussion_r47125681
  
    --- Diff: common/src/main/java/com/datatorrent/common/util/GeodeKeyValueStorageAgent.java ---
    @@ -0,0 +1,65 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.common.util;
    +
    +import java.io.Serializable;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +/**
    + * Storage Agent implementation which uses {@link GeodeStore} for operator
    + * checkpointing
    + * 
    + * @since 3.2.0
    + *
    + */
    --- End diff --
    
    I am not sure where concrete implementation should go. My preference will be to keep Abstract implementaion in Core and specific implementation in Malhar.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/184#issuecomment-164043320
  
    @tushargosavi suggested to make all attributes available to the storage agent instead of just the application id.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/184#discussion_r47686497
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/StramClient.java ---
    @@ -455,6 +457,12 @@ public void startApplication() throws YarnException, IOException
           }
     
           dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, appPath.toString());
    +      StorageAgent agent = dag.getAttributes().get(OperatorContext.STORAGE_AGENT);
    +      if (agent != null && agent instanceof ApplicationAwareStorageAgent) {
    +        ApplicationAwareStorageAgent ap = (ApplicationAwareStorageAgent)dag.getAttributes().get(OperatorContext.STORAGE_AGENT);
    --- End diff --
    
    No need to make the call again to get the agent
    ```java
    ((ApplicationAwareStorageAgent)agent).saveAppAttributes(dag.getAttributes());```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by ashishtadose <gi...@git.apache.org>.
Github user ashishtadose commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/184#issuecomment-164027438
  
    @tweise  @tushargosavi 
    
    All storage agent abstract and concrete implementations have been moved to malhar 
    MLHR-1938 - https://github.com/apache/incubator-apex-malhar/pull/125
    
    This just contains  changes related to AppIdAwareStorageAgent. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by ashishtadose <gi...@git.apache.org>.
Github user ashishtadose commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/184#issuecomment-164523414
  
    @tweise suggested changes are incorporated. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/184#discussion_r47592052
  
    --- Diff: api/src/main/java/com/datatorrent/api/ApplicationAwareStorageAgent.java ---
    @@ -0,0 +1,38 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package com.datatorrent.api;
    +
    +import com.datatorrent.api.Attribute.AttributeMap;
    +
    +/**
    + * Interface to pass application attributes to storage agent
    + * 
    + *
    + */
    +public interface ApplicationAwareStorageAgent extends StorageAgent
    +{
    + 
    +  /**
    +   * Passes attributes of application to storage agent
    +   * 
    +   * @param map attributes of application
    +   */
    +  public void saveAppAttributes(AttributeMap map);
    --- End diff --
    
    Rename to setApplicationAttributes ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-283 - checkpointing in ...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/184#issuecomment-164631402
  
    A possible future enhancement could be support for the xxxAware interface for all attribute values. But this can be taken up later also.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---