You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by ashishtadose <gi...@git.apache.org> on 2015/12/11 19:51:50 UTC

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

GitHub user ashishtadose opened a pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125

    MLHR-1938 - Operator checkpointing in distributed in-memory store

    Depends on https://issues.apache.org/jira/browse/APEXCORE-283
    
    @tushargosavi kindly review.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ashishtadose/incubator-apex-malhar MLHR-1938.checkpoint

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-apex-malhar/pull/125.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #125
    
----
commit 1402d539a6c81689997c6668af287a82f91ffca1
Author: Ashish <as...@ampool.io>
Date:   2015-12-11T18:10:35Z

    MLHR-1938 #comment Interface for storage agent to retrieve application
    id - required to solve build issue till APEXCORE-283 is released

commit 9c0ac761e36f6610d6e2cf076048a2eaa3d5993d
Author: Ashish <as...@ampool.io>
Date:   2015-12-11T18:12:10Z

    MLHR-1938 #comment Abstract implementation of KeyValue storage agent
    which can be configured with implementation of KeyValue store for
    checkpointing.

commit a9f9470db1d9efb82022e33f2ceca05a07510858
Author: Ashish <as...@ampool.io>
Date:   2015-12-11T18:35:16Z

    MLHR-1938 #comment Concrete implementation of In memory storage agent
    for Apache Geode

commit b36125d5064ebb9c0d8f4bf455de2fccfe043b2d
Author: Ashish <as...@ampool.io>
Date:   2015-12-11T18:50:27Z

    MLHR-1938.checkpoint changed ip to localhost in test

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r48616819
  
    --- Diff: contrib/pom.xml ---
    @@ -53,6 +53,11 @@
             <updatePolicy>daily</updatePolicy>
           </releases>
         </repository>
    +     <repository>
    --- End diff --
    
    Is there no Geode release in Maven central?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r47786193
  
    --- Diff: library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java ---
    @@ -0,0 +1,234 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.util;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.List;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +/**
    + * Abstract implementation of {@link AppIdAwareStorageAgent} which can be
    + * configured be KeyValue store witch implementation of {@link KeyValueStore}
    + * 
    + * NOTE - this should be picked from APEX-CORE once below feature is release
    + * https://issues.apache.org/jira/browse/APEXCORE-283
    + * 
    + * @param <S>
    + *          Store implementation
    + */
    +public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValueStore>
    +    implements AppIdAwareStorageAgent, Serializable
    +{
    +
    +  protected S store;
    +  protected String applicationId;
    +  public static final String CHECKPOINT_KEY_SEPARATOR = "-";
    +
    +  /**
    +   * Gets the store
    +   *
    +   * @return the store
    +   */
    +  public S getStore()
    +  {
    +    return store;
    +  }
    +
    +  /**
    +   * Sets the store
    +   *
    +   * @param store
    +   */
    +  public void setStore(S store)
    +  {
    +    this.store = store;
    +  }
    +
    +  /**
    +   * Return yarn application id of running application
    +   * 
    +   * @return
    +   */
    +  public String getApplicationId()
    +  {
    +    return applicationId;
    +  }
    +
    +  /**
    +   * Set yarn application id
    +   * 
    +   * @param applicationId
    +   */
    +  public void setApplicationId(String applicationId)
    +  {
    +    this.applicationId = applicationId;
    +  }
    +
    +  /**
    +   * Generates key from operator id and window id to store unique operator
    +   * checkpoints
    +   * 
    +   * @param operatorId
    +   * @param windowId
    +   * @return unique key for store
    +   */
    +  public static String generateKey(int operatorId, long windowId)
    +  {
    +    return String.valueOf(operatorId) + CHECKPOINT_KEY_SEPARATOR + String.valueOf(windowId);
    --- End diff --
    
    Should application Id not be used to generate this key? What happens if there are multiple applications running and they have operators with same operatorId?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r48798683
  
    --- Diff: pom.xml ---
    @@ -184,7 +184,6 @@
           <groupId>org.apache.apex</groupId>
           <artifactId>apex-engine</artifactId>
           <version>${apex.core.version}</version>
    -      <scope>test</scope>
    --- End diff --
    
    Malhar code (except tests) should only depend on apex-api and apex-common, not engine internals.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by ashishtadose <gi...@git.apache.org>.
Github user ashishtadose commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r47956060
  
    --- Diff: library/src/main/java/com/datatorrent/lib/util/StorageAgentKeyValueStore.java ---
    @@ -0,0 +1,88 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.util;
    +
    +import java.io.IOException;
    +import java.util.List;
    +
    +/**
    + * Interface for KeyValue store
    + * 
    + */
    +public interface StorageAgentKeyValueStore
    +{
    +
    +  /**
    +   * Connects to the service.
    +   *
    +   * @throws IOException
    +   */
    +  public void connect() throws IOException;
    +
    +  /**
    +   * Disconnects from the service.
    +   *
    +   * @throws IOException
    +   */
    +  public void disconnect() throws IOException;
    +
    +  /**
    +   *
    +   * @return returns whether the service is connected.
    +   */
    +  public boolean isConnected();
    +
    +  /**
    +   * Gets the value given the key.
    +   *
    +   * @param key
    +   * @return the value
    +   */
    +  public Object get(Object key);
    +
    +  /**
    +   * Sets the key with the value in the store.
    +   *
    +   * @param key
    +   * @param value
    +   */
    +  public void put(Object key, Object value);
    +
    +  /**
    +   * Removes the key and the value given the key
    +   * 
    +   * @param key
    +   */
    +  public void remove(Object key);
    +
    +  /**
    +   * Get all the keys associated with key
    +   * 
    +   * @param key
    +   * @return the list of all associated keys
    +   */
    +  public List<String> getKeys(Object key);
    +
    +  /**
    +   * Set table/region name of store
    +   * 
    +   * @param tableName
    +   */
    +  public void setTableName(String tableName);
    --- End diff --
    
    I thought it would be common for any key value store service as most of them have notion  of table, region to denote to particular storage instance. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by ashishtadose <gi...@git.apache.org>.
Github user ashishtadose commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r47956411
  
    --- Diff: library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java ---
    @@ -0,0 +1,234 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.util;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.List;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +/**
    + * Abstract implementation of {@link AppIdAwareStorageAgent} which can be
    + * configured be KeyValue store witch implementation of {@link KeyValueStore}
    + * 
    + * NOTE - this should be picked from APEX-CORE once below feature is release
    + * https://issues.apache.org/jira/browse/APEXCORE-283
    + * 
    + * @param <S>
    + *          Store implementation
    + */
    +public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValueStore>
    +    implements AppIdAwareStorageAgent, Serializable
    +{
    +
    +  protected S store;
    +  protected String applicationId;
    +  public static final String CHECKPOINT_KEY_SEPARATOR = "-";
    +
    +  /**
    +   * Gets the store
    +   *
    +   * @return the store
    +   */
    +  public S getStore()
    +  {
    +    return store;
    +  }
    +
    +  /**
    +   * Sets the store
    +   *
    +   * @param store
    +   */
    +  public void setStore(S store)
    +  {
    +    this.store = store;
    +  }
    +
    +  /**
    +   * Return yarn application id of running application
    +   * 
    +   * @return
    +   */
    +  public String getApplicationId()
    +  {
    +    return applicationId;
    +  }
    +
    +  /**
    +   * Set yarn application id
    +   * 
    +   * @param applicationId
    +   */
    +  public void setApplicationId(String applicationId)
    +  {
    +    this.applicationId = applicationId;
    +  }
    +
    +  /**
    +   * Generates key from operator id and window id to store unique operator
    +   * checkpoints
    +   * 
    +   * @param operatorId
    +   * @param windowId
    +   * @return unique key for store
    +   */
    +  public static String generateKey(int operatorId, long windowId)
    +  {
    +    return String.valueOf(operatorId) + CHECKPOINT_KEY_SEPARATOR + String.valueOf(windowId);
    --- End diff --
    
    This would not happen as application ids are being used to create application specific tables or regions. 
    
    For checkpointing of each application's operators a new table/region is created once the application is started.
      


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r48813311
  
    --- Diff: library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java ---
    @@ -0,0 +1,236 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.util;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.List;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.datatorrent.api.Attribute.AttributeMap;
    +import com.datatorrent.lib.util.StorageAgent.ApplicationAwareStorageAgent;
    +import com.datatorrent.stram.plan.logical.LogicalPlan;
    +
    +/**
    + * Abstract implementation of {@link AppIdAwareStorageAgent} which can be
    + * configured be KeyValue store witch implementation of {@link KeyValueStore}
    + * 
    + * NOTE - this should be picked from APEX-CORE once below feature is release
    + * https://issues.apache.org/jira/browse/APEXCORE-283
    + * 
    + * @param <S>
    + *          Store implementation
    + */
    +public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValueStore>
    +    implements ApplicationAwareStorageAgent, Serializable
    +{
    +
    +  protected S store;
    +  protected String applicationId;
    +  public static final String CHECKPOINT_KEY_SEPARATOR = "-";
    +
    +  /**
    +   * Gets the store
    +   *
    +   * @return the store
    +   */
    +  public S getStore()
    +  {
    +    return store;
    +  }
    +
    +  /**
    +   * Sets the store
    +   *
    +   * @param store
    +   */
    +  public void setStore(S store)
    +  {
    +    this.store = store;
    +  }
    +
    +  /**
    +   * Return yarn application id of running application
    +   * 
    +   * @return
    +   */
    +  public String getApplicationId()
    +  {
    +    return applicationId;
    +  }
    +
    +  /**
    +   * Set yarn application id
    +   * 
    +   * @param applicationId
    +   */
    +  public void setApplicationId(String applicationId)
    +  {
    +    this.applicationId = applicationId;
    +  }
    +
    +  /**
    +   * Generates key from operator id and window id to store unique operator
    +   * checkpoints
    +   * 
    +   * @param operatorId
    +   * @param windowId
    +   * @return unique key for store
    +   */
    +  public static String generateKey(int operatorId, long windowId)
    +  {
    +    return String.valueOf(operatorId) + CHECKPOINT_KEY_SEPARATOR + String.valueOf(windowId);
    +  }
    +
    +  /**
    +   * Stores the given operator object in configured store
    +   * 
    +   * @param object
    +   *          Operator object to store
    +   * @param operatorId
    +   *          of operator
    +   * @param windowId
    +   *          window id of operator to checkpoint
    +   * 
    +   */
    +  @Override
    +  public void save(Object object, int operatorId, long windowId) throws IOException
    +  {
    +
    +    try {
    +      store(generateKey(operatorId, windowId), object);
    +      logger.info("saved check point object key {} region {}", generateKey(operatorId, windowId), applicationId);
    +    } catch (Exception ex) {
    +      logger.info("while saving {} {}", operatorId, windowId, ex);
    +      throw new RuntimeException(ex);
    +    }
    +  }
    +
    +  private synchronized void store(String checkpointKey, Object operator) throws IOException
    +  {
    +    if (!getStore().isConnected()) {
    +      getStore().connect();
    +    }
    +    getStore().put(checkpointKey, operator);
    +  }
    +
    +  /**
    +   * Retrieves the operator object for given operator & window from configured
    +   * store
    +   * 
    +   * @param operatorId
    +   *          of operator
    +   * @param windowId
    +   *          window id of operator to checkpoint
    +   */
    +  @Override
    +  public Object load(int operatorId, long windowId)
    +  {
    +    Object obj = null;
    +    try {
    +      obj = retrieve(generateKey(operatorId, windowId));
    +      logger.info("retrieved object from store  key {} region {} ", generateKey(operatorId, windowId), applicationId);
    +    } catch (Exception ex) {
    +      logger.info("while loading {} {}", operatorId, windowId, ex);
    +      throw new RuntimeException(ex);
    +    }
    +
    +    return obj;
    +  }
    +
    +  private synchronized Object retrieve(String checkpointKey) throws IOException
    +  {
    +    if (!getStore().isConnected()) {
    +      getStore().connect();
    +    }
    +
    +    return getStore().get(checkpointKey);
    +  }
    +
    +  /**
    +   * Removes stored operator object for given operatorId & windowId from store
    +   * 
    +   */
    +  @Override
    +  public void delete(int operatorId, long windowId) throws IOException
    +  {
    +
    +    if (!getStore().isConnected()) {
    +      getStore().connect();
    +    }
    +
    +    try {
    +      getStore().remove(generateKey(operatorId, windowId));
    +      logger.info("deleted object from store key {} region {}", generateKey(operatorId, windowId));
    +    } catch (Exception ex) {
    +      logger.info("while deleting {} {}", operatorId, windowId, ex);
    +      throw new RuntimeException(ex);
    +    }
    +
    +  }
    +
    +  /**
    +   * Returns list window id for given operator id for which operator objects are
    +   * stored but not removed
    +   * 
    +   */
    +  @Override
    +  public long[] getWindowIds(int operatorId) throws IOException
    +  {
    +    if (!getStore().isConnected()) {
    +      getStore().connect();
    +    }
    +
    +    List<String> keys = getStore().getKeys(operatorId);
    +    if (keys.size() > 0) {
    +      long[] windowsIds = new long[keys.size()];
    +      int count = 0;
    +      for (String key : keys) {
    +        windowsIds[count] = extractwindowId(key);
    +        count++;
    +      }
    +      return windowsIds;
    +    } else {
    +      return new long[0];
    +    }
    +  }
    +
    +  public static long extractwindowId(String checkpointKey)
    +  {
    +    String[] parts = checkpointKey.split(CHECKPOINT_KEY_SEPARATOR);
    +    return Long.parseLong(parts[1]);
    +  }
    +
    +  /**
    +   * Saves the yarn application id which can be used by create application
    +   * specific table/region in KeyValue sore.
    +   */
    +  @Override
    +  public void setApplicationAttributes(AttributeMap map)
    +  {
    +    this.applicationId = map.get(LogicalPlan.APPLICATION_ID);
    --- End diff --
    
    Change this to DAGContext.APPLICATION_ID


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r47785122
  
    --- Diff: library/src/main/java/com/datatorrent/lib/util/AppIdAwareStorageAgent.java ---
    @@ -0,0 +1,37 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package com.datatorrent.lib.util;
    +
    +import com.datatorrent.api.StorageAgent;
    +
    +/**
    + * Interface to pass application id to storage agent
    + * 
    + *
    + */
    +public interface AppIdAwareStorageAgent extends StorageAgent
    --- End diff --
    
    Is this required here? Will ApplicationAwareStorageAgent defined in Apex not take care of this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r48560143
  
    --- Diff: contrib/pom.xml ---
    @@ -623,5 +628,12 @@
           <artifactId>super-csv-joda</artifactId>
           <version>2.3.1</version>
         </dependency>
    +    <dependency>
    +      <groupId>com.gemstone.gemfire</groupId>
    +      <artifactId>gemfire</artifactId>
    +      <version>8.2.0</version>
    +      <type>jar</type>
    +      <scope>compile</scope>
    --- End diff --
    
    I meant that this dependency should be set optional


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r48564053
  
    --- Diff: contrib/pom.xml ---
    @@ -623,5 +628,11 @@
           <artifactId>super-csv-joda</artifactId>
           <version>2.3.1</version>
         </dependency>
    +    <dependency>
    +      <groupId>com.gemstone.gemfire</groupId>
    +      <artifactId>gemfire</artifactId>
    +      <version>8.2.0</version>
    +      <type>jar</type>
    --- End diff --
    
    please add `<optional>true</optional>`
    `type` is by default jar so not required


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r48786249
  
    --- Diff: library/src/main/java/com/datatorrent/lib/util/StorageAgentKeyValueStore.java ---
    @@ -0,0 +1,88 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.util;
    +
    +import java.io.IOException;
    +import java.util.List;
    +
    +/**
    + * Interface for KeyValue store
    + * 
    + */
    +public interface StorageAgentKeyValueStore
    --- End diff --
    
    Additional method can be added to extended Storage Agent. For the ones which are not required you can leave their implementation empty or throw exception


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r47960875
  
    --- Diff: library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java ---
    @@ -0,0 +1,234 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.util;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.List;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +/**
    + * Abstract implementation of {@link AppIdAwareStorageAgent} which can be
    + * configured be KeyValue store witch implementation of {@link KeyValueStore}
    + * 
    + * NOTE - this should be picked from APEX-CORE once below feature is release
    + * https://issues.apache.org/jira/browse/APEXCORE-283
    + * 
    + * @param <S>
    + *          Store implementation
    + */
    +public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValueStore>
    +    implements AppIdAwareStorageAgent, Serializable
    +{
    +
    +  protected S store;
    +  protected String applicationId;
    +  public static final String CHECKPOINT_KEY_SEPARATOR = "-";
    +
    +  /**
    +   * Gets the store
    +   *
    +   * @return the store
    +   */
    +  public S getStore()
    +  {
    +    return store;
    +  }
    +
    +  /**
    +   * Sets the store
    +   *
    +   * @param store
    +   */
    +  public void setStore(S store)
    +  {
    +    this.store = store;
    +  }
    +
    +  /**
    +   * Return yarn application id of running application
    +   * 
    +   * @return
    +   */
    +  public String getApplicationId()
    +  {
    +    return applicationId;
    +  }
    +
    +  /**
    +   * Set yarn application id
    +   * 
    +   * @param applicationId
    +   */
    +  public void setApplicationId(String applicationId)
    +  {
    +    this.applicationId = applicationId;
    +  }
    +
    +  /**
    +   * Generates key from operator id and window id to store unique operator
    +   * checkpoints
    +   * 
    +   * @param operatorId
    +   * @param windowId
    +   * @return unique key for store
    +   */
    +  public static String generateKey(int operatorId, long windowId)
    +  {
    +    return String.valueOf(operatorId) + CHECKPOINT_KEY_SEPARATOR + String.valueOf(windowId);
    +  }
    +
    +  /**
    +   * Stores the given operator object in configured store
    +   * 
    +   * @param object
    +   *          Operator object to store
    +   * @param operatorId
    +   *          of operator
    +   * @param windowId
    +   *          window id of operator to checkpoint
    +   * 
    +   */
    +  @Override
    +  public void save(Object object, int operatorId, long windowId) throws IOException
    +  {
    +
    +    try {
    +      store(generateKey(operatorId, windowId), object);
    +      logger.info("saved check point object key {} region {}", generateKey(operatorId, windowId), applicationId);
    +    } catch (Throwable t) {
    --- End diff --
    
    You should not catch any exception. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by ashishtadose <gi...@git.apache.org>.
Github user ashishtadose commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r48814740
  
    --- Diff: library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java ---
    @@ -0,0 +1,236 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.util;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.List;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.datatorrent.api.Attribute.AttributeMap;
    +import com.datatorrent.lib.util.StorageAgent.ApplicationAwareStorageAgent;
    +import com.datatorrent.stram.plan.logical.LogicalPlan;
    +
    +/**
    + * Abstract implementation of {@link AppIdAwareStorageAgent} which can be
    + * configured be KeyValue store witch implementation of {@link KeyValueStore}
    + * 
    + * NOTE - this should be picked from APEX-CORE once below feature is release
    + * https://issues.apache.org/jira/browse/APEXCORE-283
    + * 
    + * @param <S>
    + *          Store implementation
    + */
    +public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValueStore>
    +    implements ApplicationAwareStorageAgent, Serializable
    +{
    +
    +  protected S store;
    +  protected String applicationId;
    +  public static final String CHECKPOINT_KEY_SEPARATOR = "-";
    +
    +  /**
    +   * Gets the store
    +   *
    +   * @return the store
    +   */
    +  public S getStore()
    +  {
    +    return store;
    +  }
    +
    +  /**
    +   * Sets the store
    +   *
    +   * @param store
    +   */
    +  public void setStore(S store)
    +  {
    +    this.store = store;
    +  }
    +
    +  /**
    +   * Return yarn application id of running application
    +   * 
    +   * @return
    +   */
    +  public String getApplicationId()
    +  {
    +    return applicationId;
    +  }
    +
    +  /**
    +   * Set yarn application id
    +   * 
    +   * @param applicationId
    +   */
    +  public void setApplicationId(String applicationId)
    +  {
    +    this.applicationId = applicationId;
    +  }
    +
    +  /**
    +   * Generates key from operator id and window id to store unique operator
    +   * checkpoints
    +   * 
    +   * @param operatorId
    +   * @param windowId
    +   * @return unique key for store
    +   */
    +  public static String generateKey(int operatorId, long windowId)
    +  {
    +    return String.valueOf(operatorId) + CHECKPOINT_KEY_SEPARATOR + String.valueOf(windowId);
    +  }
    +
    +  /**
    +   * Stores the given operator object in configured store
    +   * 
    +   * @param object
    +   *          Operator object to store
    +   * @param operatorId
    +   *          of operator
    +   * @param windowId
    +   *          window id of operator to checkpoint
    +   * 
    +   */
    +  @Override
    +  public void save(Object object, int operatorId, long windowId) throws IOException
    +  {
    +
    +    try {
    +      store(generateKey(operatorId, windowId), object);
    +      logger.info("saved check point object key {} region {}", generateKey(operatorId, windowId), applicationId);
    +    } catch (Exception ex) {
    +      logger.info("while saving {} {}", operatorId, windowId, ex);
    +      throw new RuntimeException(ex);
    +    }
    +  }
    +
    +  private synchronized void store(String checkpointKey, Object operator) throws IOException
    +  {
    +    if (!getStore().isConnected()) {
    +      getStore().connect();
    +    }
    +    getStore().put(checkpointKey, operator);
    +  }
    +
    +  /**
    +   * Retrieves the operator object for given operator & window from configured
    +   * store
    +   * 
    +   * @param operatorId
    +   *          of operator
    +   * @param windowId
    +   *          window id of operator to checkpoint
    +   */
    +  @Override
    +  public Object load(int operatorId, long windowId)
    +  {
    +    Object obj = null;
    +    try {
    +      obj = retrieve(generateKey(operatorId, windowId));
    +      logger.info("retrieved object from store  key {} region {} ", generateKey(operatorId, windowId), applicationId);
    +    } catch (Exception ex) {
    +      logger.info("while loading {} {}", operatorId, windowId, ex);
    +      throw new RuntimeException(ex);
    +    }
    +
    +    return obj;
    +  }
    +
    +  private synchronized Object retrieve(String checkpointKey) throws IOException
    +  {
    +    if (!getStore().isConnected()) {
    +      getStore().connect();
    +    }
    +
    +    return getStore().get(checkpointKey);
    +  }
    +
    +  /**
    +   * Removes stored operator object for given operatorId & windowId from store
    +   * 
    +   */
    +  @Override
    +  public void delete(int operatorId, long windowId) throws IOException
    +  {
    +
    +    if (!getStore().isConnected()) {
    +      getStore().connect();
    +    }
    +
    +    try {
    +      getStore().remove(generateKey(operatorId, windowId));
    +      logger.info("deleted object from store key {} region {}", generateKey(operatorId, windowId));
    +    } catch (Exception ex) {
    +      logger.info("while deleting {} {}", operatorId, windowId, ex);
    +      throw new RuntimeException(ex);
    +    }
    +
    +  }
    +
    +  /**
    +   * Returns list window id for given operator id for which operator objects are
    +   * stored but not removed
    +   * 
    +   */
    +  @Override
    +  public long[] getWindowIds(int operatorId) throws IOException
    +  {
    +    if (!getStore().isConnected()) {
    +      getStore().connect();
    +    }
    +
    +    List<String> keys = getStore().getKeys(operatorId);
    +    if (keys.size() > 0) {
    +      long[] windowsIds = new long[keys.size()];
    +      int count = 0;
    +      for (String key : keys) {
    +        windowsIds[count] = extractwindowId(key);
    +        count++;
    +      }
    +      return windowsIds;
    +    } else {
    +      return new long[0];
    +    }
    +  }
    +
    +  public static long extractwindowId(String checkpointKey)
    +  {
    +    String[] parts = checkpointKey.split(CHECKPOINT_KEY_SEPARATOR);
    +    return Long.parseLong(parts[1]);
    +  }
    +
    +  /**
    +   * Saves the yarn application id which can be used by create application
    +   * specific table/region in KeyValue sore.
    +   */
    +  @Override
    +  public void setApplicationAttributes(AttributeMap map)
    +  {
    +    this.applicationId = map.get(LogicalPlan.APPLICATION_ID);
    --- End diff --
    
    Ok. will make these changes and push.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by ashishtadose <gi...@git.apache.org>.
Github user ashishtadose commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r48274840
  
    --- Diff: library/src/main/java/com/datatorrent/lib/util/StorageAgentKeyValueStore.java ---
    @@ -0,0 +1,88 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.util;
    +
    +import java.io.IOException;
    +import java.util.List;
    +
    +/**
    + * Interface for KeyValue store
    + * 
    + */
    +public interface StorageAgentKeyValueStore
    +{
    +
    +  /**
    +   * Connects to the service.
    +   *
    +   * @throws IOException
    +   */
    +  public void connect() throws IOException;
    +
    +  /**
    +   * Disconnects from the service.
    +   *
    +   * @throws IOException
    +   */
    +  public void disconnect() throws IOException;
    +
    +  /**
    +   *
    +   * @return returns whether the service is connected.
    +   */
    +  public boolean isConnected();
    +
    +  /**
    +   * Gets the value given the key.
    +   *
    +   * @param key
    +   * @return the value
    +   */
    +  public Object get(Object key);
    +
    +  /**
    +   * Sets the key with the value in the store.
    +   *
    +   * @param key
    +   * @param value
    +   */
    +  public void put(Object key, Object value);
    +
    +  /**
    +   * Removes the key and the value given the key
    +   * 
    +   * @param key
    +   */
    +  public void remove(Object key);
    +
    +  /**
    +   * Get all the keys associated with key
    +   * 
    +   * @param key
    +   * @return the list of all associated keys
    +   */
    +  public List<String> getKeys(Object key);
    --- End diff --
    
    Rationale for getKeys() method.
    
    StorageAgent needs to provide below fours operations for operator checkpointing 
    1 save(opertorId, windowId)
    2 load(opertorId, windowId)
    3 delete(opertorId, windowId)
    4 getWindowIds(opertorId)
    
    All Key value based systems perform operations based on key so to accommodate these operations need to form a key with combination of operatorId & windowId to uniquely identify checkpoints.
    Note that each checkpoints will be written to different table/region by store implementation by using name provided by setTableName method.
    
    To support 4th operation of getting windows ids related given operator id store implementation will have to scan keys using some form of query and return related windowids.
    
    Please check provided concrete implementation of GeodeStore  
     


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r48616767
  
    --- Diff: pom.xml ---
    @@ -184,7 +184,6 @@
           <groupId>org.apache.apex</groupId>
           <artifactId>apex-engine</artifactId>
           <version>${apex.core.version}</version>
    -      <scope>test</scope>
    --- End diff --
    
    Why this change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by ashishtadose <gi...@git.apache.org>.
Github user ashishtadose commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r48620383
  
    --- Diff: pom.xml ---
    @@ -184,7 +184,6 @@
           <groupId>org.apache.apex</groupId>
           <artifactId>apex-engine</artifactId>
           <version>${apex.core.version}</version>
    -      <scope>test</scope>
    --- End diff --
    
    This is required as AbstractKeyValueStorageAgent.java needs access to com.datatorrent.stram.plan.logical and test scope would cause compilation failure.
    
    Please check - incubator-apex-malhar/library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java:[229,34]


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r47785851
  
    --- Diff: library/src/main/java/com/datatorrent/lib/util/StorageAgentKeyValueStore.java ---
    @@ -0,0 +1,88 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.util;
    +
    +import java.io.IOException;
    +import java.util.List;
    +
    +/**
    + * Interface for KeyValue store
    + * 
    + */
    +public interface StorageAgentKeyValueStore
    +{
    +
    +  /**
    +   * Connects to the service.
    +   *
    +   * @throws IOException
    +   */
    +  public void connect() throws IOException;
    +
    +  /**
    +   * Disconnects from the service.
    +   *
    +   * @throws IOException
    +   */
    +  public void disconnect() throws IOException;
    +
    +  /**
    +   *
    +   * @return returns whether the service is connected.
    +   */
    +  public boolean isConnected();
    +
    +  /**
    +   * Gets the value given the key.
    +   *
    +   * @param key
    +   * @return the value
    +   */
    +  public Object get(Object key);
    +
    +  /**
    +   * Sets the key with the value in the store.
    +   *
    +   * @param key
    +   * @param value
    +   */
    +  public void put(Object key, Object value);
    +
    +  /**
    +   * Removes the key and the value given the key
    +   * 
    +   * @param key
    +   */
    +  public void remove(Object key);
    +
    +  /**
    +   * Get all the keys associated with key
    +   * 
    +   * @param key
    +   * @return the list of all associated keys
    +   */
    +  public List<String> getKeys(Object key);
    --- End diff --
    
    What is the usecase for this? Can you please give example how will it work?
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by ashishtadose <gi...@git.apache.org>.
Github user ashishtadose commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r48620604
  
    --- Diff: contrib/pom.xml ---
    @@ -53,6 +53,11 @@
             <updatePolicy>daily</updatePolicy>
           </releases>
         </repository>
    +     <repository>
    --- End diff --
    
    No. Geode has not released a major release version yet.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r50181693
  
    --- Diff: contrib/pom.xml ---
    @@ -623,5 +628,11 @@
           <artifactId>super-csv-joda</artifactId>
           <version>2.3.1</version>
         </dependency>
    +    <dependency>
    +      <groupId>com.gemstone.gemfire</groupId>
    --- End diff --
    
    That would be preferred. We would get the dependency from Apache snapshots,
    right? In that case, no extra repository declaration should be required.
    
    On Tue, Jan 19, 2016 at 1:50 PM, William Markito <no...@github.com>
    wrote:
    
    > In contrib/pom.xml
    > <https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r50181242>
    > :
    >
    > > @@ -623,5 +628,11 @@
    > >        <artifactId>super-csv-joda</artifactId>
    > >        <version>2.3.1</version>
    > >      </dependency>
    > > +    <dependency>
    > > +      <groupId>com.gemstone.gemfire</groupId>
    >
    > Would you guys like to point it to Geode instead of GemFire ? For now it
    > would need to be pulled from incubator repository and from nightlies but a
    > release is just about to start.
    >
    >       <groupId>org.apache.geode</groupId>
    >       <artifactId>gemfire-core</artifactId>
    >       <version>${geode.version}</version>
    > </dependency>
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/incubator-apex-malhar/pull/125/files#r50181242>
    > .
    >



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r47785600
  
    --- Diff: library/src/main/java/com/datatorrent/lib/util/StorageAgentKeyValueStore.java ---
    @@ -0,0 +1,88 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.util;
    +
    +import java.io.IOException;
    +import java.util.List;
    +
    +/**
    + * Interface for KeyValue store
    + * 
    + */
    +public interface StorageAgentKeyValueStore
    --- End diff --
    
    There is com.datatorrent.lib.db.KeyValueStore in malhar. Can this not be used? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r47787431
  
    --- Diff: library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java ---
    @@ -0,0 +1,234 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.util;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.List;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +/**
    + * Abstract implementation of {@link AppIdAwareStorageAgent} which can be
    + * configured be KeyValue store witch implementation of {@link KeyValueStore}
    + * 
    + * NOTE - this should be picked from APEX-CORE once below feature is release
    + * https://issues.apache.org/jira/browse/APEXCORE-283
    + * 
    + * @param <S>
    + *          Store implementation
    + */
    +public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValueStore>
    +    implements AppIdAwareStorageAgent, Serializable
    +{
    +
    +  protected S store;
    +  protected String applicationId;
    --- End diff --
    
    This should reflect the changes for ApplicationAwareStorageAgent in Apex, which it is not


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r47785678
  
    --- Diff: library/src/main/java/com/datatorrent/lib/util/StorageAgentKeyValueStore.java ---
    @@ -0,0 +1,88 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.util;
    +
    +import java.io.IOException;
    +import java.util.List;
    +
    +/**
    + * Interface for KeyValue store
    + * 
    + */
    +public interface StorageAgentKeyValueStore
    +{
    +
    +  /**
    +   * Connects to the service.
    +   *
    +   * @throws IOException
    +   */
    +  public void connect() throws IOException;
    +
    +  /**
    +   * Disconnects from the service.
    +   *
    +   * @throws IOException
    +   */
    +  public void disconnect() throws IOException;
    +
    +  /**
    +   *
    +   * @return returns whether the service is connected.
    +   */
    +  public boolean isConnected();
    +
    +  /**
    +   * Gets the value given the key.
    +   *
    +   * @param key
    +   * @return the value
    +   */
    +  public Object get(Object key);
    +
    +  /**
    +   * Sets the key with the value in the store.
    +   *
    +   * @param key
    +   * @param value
    +   */
    +  public void put(Object key, Object value);
    +
    +  /**
    +   * Removes the key and the value given the key
    +   * 
    +   * @param key
    +   */
    +  public void remove(Object key);
    +
    +  /**
    +   * Get all the keys associated with key
    +   * 
    +   * @param key
    +   * @return the list of all associated keys
    +   */
    +  public List<String> getKeys(Object key);
    +
    +  /**
    +   * Set table/region name of store
    +   * 
    +   * @param tableName
    +   */
    +  public void setTableName(String tableName);
    --- End diff --
    
    This is very specific to an implementation.. I don't think this is required in API


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by ashishtadose <gi...@git.apache.org>.
Github user ashishtadose commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r48813010
  
    --- Diff: pom.xml ---
    @@ -184,7 +184,6 @@
           <groupId>org.apache.apex</groupId>
           <artifactId>apex-engine</artifactId>
           <version>${apex.core.version}</version>
    -      <scope>test</scope>
    --- End diff --
    
    Hi @tweise 
    Can you please take a look at incubator-apex-malhar/library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java:[229,34]
    
    and suggest what can be done to accommodate this change without changing test score of apex-engine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by ashishtadose <gi...@git.apache.org>.
Github user ashishtadose commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r47955341
  
    --- Diff: library/src/main/java/com/datatorrent/lib/util/AppIdAwareStorageAgent.java ---
    @@ -0,0 +1,37 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package com.datatorrent.lib.util;
    +
    +import com.datatorrent.api.StorageAgent;
    +
    +/**
    + * Interface to pass application id to storage agent
    + * 
    + *
    + */
    +public interface AppIdAwareStorageAgent extends StorageAgent
    --- End diff --
    
    This is required as ApplicationAwareStorageAgent is not released in Apex core and build takes core dependency of release candidate but not develop branches.
     


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-apex-malhar/pull/125


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r48813336
  
    --- Diff: library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java ---
    @@ -0,0 +1,236 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.util;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.List;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.datatorrent.api.Attribute.AttributeMap;
    +import com.datatorrent.lib.util.StorageAgent.ApplicationAwareStorageAgent;
    +import com.datatorrent.stram.plan.logical.LogicalPlan;
    +
    +/**
    + * Abstract implementation of {@link AppIdAwareStorageAgent} which can be
    + * configured be KeyValue store witch implementation of {@link KeyValueStore}
    + * 
    + * NOTE - this should be picked from APEX-CORE once below feature is release
    + * https://issues.apache.org/jira/browse/APEXCORE-283
    + * 
    + * @param <S>
    + *          Store implementation
    + */
    +public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValueStore>
    +    implements ApplicationAwareStorageAgent, Serializable
    +{
    +
    +  protected S store;
    +  protected String applicationId;
    +  public static final String CHECKPOINT_KEY_SEPARATOR = "-";
    +
    +  /**
    +   * Gets the store
    +   *
    +   * @return the store
    +   */
    +  public S getStore()
    +  {
    +    return store;
    +  }
    +
    +  /**
    +   * Sets the store
    +   *
    +   * @param store
    +   */
    +  public void setStore(S store)
    +  {
    +    this.store = store;
    +  }
    +
    +  /**
    +   * Return yarn application id of running application
    +   * 
    +   * @return
    +   */
    +  public String getApplicationId()
    +  {
    +    return applicationId;
    +  }
    +
    +  /**
    +   * Set yarn application id
    +   * 
    +   * @param applicationId
    +   */
    +  public void setApplicationId(String applicationId)
    +  {
    +    this.applicationId = applicationId;
    +  }
    +
    +  /**
    +   * Generates key from operator id and window id to store unique operator
    +   * checkpoints
    +   * 
    +   * @param operatorId
    +   * @param windowId
    +   * @return unique key for store
    +   */
    +  public static String generateKey(int operatorId, long windowId)
    +  {
    +    return String.valueOf(operatorId) + CHECKPOINT_KEY_SEPARATOR + String.valueOf(windowId);
    +  }
    +
    +  /**
    +   * Stores the given operator object in configured store
    +   * 
    +   * @param object
    +   *          Operator object to store
    +   * @param operatorId
    +   *          of operator
    +   * @param windowId
    +   *          window id of operator to checkpoint
    +   * 
    +   */
    +  @Override
    +  public void save(Object object, int operatorId, long windowId) throws IOException
    +  {
    +
    +    try {
    +      store(generateKey(operatorId, windowId), object);
    +      logger.info("saved check point object key {} region {}", generateKey(operatorId, windowId), applicationId);
    +    } catch (Exception ex) {
    +      logger.info("while saving {} {}", operatorId, windowId, ex);
    +      throw new RuntimeException(ex);
    +    }
    +  }
    +
    +  private synchronized void store(String checkpointKey, Object operator) throws IOException
    +  {
    +    if (!getStore().isConnected()) {
    +      getStore().connect();
    +    }
    +    getStore().put(checkpointKey, operator);
    +  }
    +
    +  /**
    +   * Retrieves the operator object for given operator & window from configured
    +   * store
    +   * 
    +   * @param operatorId
    +   *          of operator
    +   * @param windowId
    +   *          window id of operator to checkpoint
    +   */
    +  @Override
    +  public Object load(int operatorId, long windowId)
    +  {
    +    Object obj = null;
    +    try {
    +      obj = retrieve(generateKey(operatorId, windowId));
    +      logger.info("retrieved object from store  key {} region {} ", generateKey(operatorId, windowId), applicationId);
    +    } catch (Exception ex) {
    +      logger.info("while loading {} {}", operatorId, windowId, ex);
    +      throw new RuntimeException(ex);
    +    }
    +
    +    return obj;
    +  }
    +
    +  private synchronized Object retrieve(String checkpointKey) throws IOException
    +  {
    +    if (!getStore().isConnected()) {
    +      getStore().connect();
    +    }
    +
    +    return getStore().get(checkpointKey);
    +  }
    +
    +  /**
    +   * Removes stored operator object for given operatorId & windowId from store
    +   * 
    +   */
    +  @Override
    +  public void delete(int operatorId, long windowId) throws IOException
    +  {
    +
    +    if (!getStore().isConnected()) {
    +      getStore().connect();
    +    }
    +
    +    try {
    +      getStore().remove(generateKey(operatorId, windowId));
    +      logger.info("deleted object from store key {} region {}", generateKey(operatorId, windowId));
    --- End diff --
    
    debug


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by ashishtadose <gi...@git.apache.org>.
Github user ashishtadose commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r48563172
  
    --- Diff: contrib/pom.xml ---
    @@ -623,5 +628,12 @@
           <artifactId>super-csv-joda</artifactId>
           <version>2.3.1</version>
         </dependency>
    +    <dependency>
    +      <groupId>com.gemstone.gemfire</groupId>
    +      <artifactId>gemfire</artifactId>
    +      <version>8.2.0</version>
    +      <type>jar</type>
    +      <scope>compile</scope>
    --- End diff --
    
    Yes dependency compile scope is default and should be optional. Fixed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by markito <gi...@git.apache.org>.
Github user markito commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r50181242
  
    --- Diff: contrib/pom.xml ---
    @@ -623,5 +628,11 @@
           <artifactId>super-csv-joda</artifactId>
           <version>2.3.1</version>
         </dependency>
    +    <dependency>
    +      <groupId>com.gemstone.gemfire</groupId>
    --- End diff --
    
    Would you guys like to point it to Geode instead of GemFire ?  For now it would need to be pulled from incubator repository and from nightlies but a release is just about to start. 
    
    ```<dependency>
          <groupId>org.apache.geode</groupId>
          <artifactId>gemfire-core</artifactId>
          <version>${geode.version}</version>
    </dependency>
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by ashishtadose <gi...@git.apache.org>.
Github user ashishtadose commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r47956815
  
    --- Diff: library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java ---
    @@ -0,0 +1,234 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.util;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.List;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +/**
    + * Abstract implementation of {@link AppIdAwareStorageAgent} which can be
    + * configured be KeyValue store witch implementation of {@link KeyValueStore}
    + * 
    + * NOTE - this should be picked from APEX-CORE once below feature is release
    + * https://issues.apache.org/jira/browse/APEXCORE-283
    + * 
    + * @param <S>
    + *          Store implementation
    + */
    +public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValueStore>
    +    implements AppIdAwareStorageAgent, Serializable
    +{
    +
    +  protected S store;
    +  protected String applicationId;
    --- End diff --
    
    Yes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r48559989
  
    --- Diff: contrib/pom.xml ---
    @@ -623,5 +628,12 @@
           <artifactId>super-csv-joda</artifactId>
           <version>2.3.1</version>
         </dependency>
    +    <dependency>
    +      <groupId>com.gemstone.gemfire</groupId>
    +      <artifactId>gemfire</artifactId>
    +      <version>8.2.0</version>
    +      <type>jar</type>
    +      <scope>compile</scope>
    --- End diff --
    
    Default scope is compile so not required. I think the scope should be optional


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r47787032
  
    --- Diff: library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java ---
    @@ -0,0 +1,234 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.util;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.List;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +/**
    + * Abstract implementation of {@link AppIdAwareStorageAgent} which can be
    + * configured be KeyValue store witch implementation of {@link KeyValueStore}
    + * 
    + * NOTE - this should be picked from APEX-CORE once below feature is release
    + * https://issues.apache.org/jira/browse/APEXCORE-283
    + * 
    + * @param <S>
    + *          Store implementation
    + */
    +public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValueStore>
    +    implements AppIdAwareStorageAgent, Serializable
    +{
    +
    +  protected S store;
    +  protected String applicationId;
    +  public static final String CHECKPOINT_KEY_SEPARATOR = "-";
    +
    +  /**
    +   * Gets the store
    +   *
    +   * @return the store
    +   */
    +  public S getStore()
    +  {
    +    return store;
    +  }
    +
    +  /**
    +   * Sets the store
    +   *
    +   * @param store
    +   */
    +  public void setStore(S store)
    +  {
    +    this.store = store;
    +  }
    +
    +  /**
    +   * Return yarn application id of running application
    +   * 
    +   * @return
    +   */
    +  public String getApplicationId()
    +  {
    +    return applicationId;
    +  }
    +
    +  /**
    +   * Set yarn application id
    +   * 
    +   * @param applicationId
    +   */
    +  public void setApplicationId(String applicationId)
    +  {
    +    this.applicationId = applicationId;
    +  }
    +
    +  /**
    +   * Generates key from operator id and window id to store unique operator
    +   * checkpoints
    +   * 
    +   * @param operatorId
    +   * @param windowId
    +   * @return unique key for store
    +   */
    +  public static String generateKey(int operatorId, long windowId)
    +  {
    +    return String.valueOf(operatorId) + CHECKPOINT_KEY_SEPARATOR + String.valueOf(windowId);
    +  }
    +
    +  /**
    +   * Stores the given operator object in configured store
    +   * 
    +   * @param object
    +   *          Operator object to store
    +   * @param operatorId
    +   *          of operator
    +   * @param windowId
    +   *          window id of operator to checkpoint
    +   * 
    +   */
    +  @Override
    +  public void save(Object object, int operatorId, long windowId) throws IOException
    +  {
    +
    +    try {
    +      store(generateKey(operatorId, windowId), object);
    +      logger.info("saved check point object key {} region {}", generateKey(operatorId, windowId), applicationId);
    +    } catch (Throwable t) {
    +      logger.info("while saving {} {}", operatorId, windowId, t);
    +      DTThrowable.rethrow(t);
    +    }
    +  }
    +
    +  private synchronized void store(String checkpointKey, Object operator) throws IOException
    +  {
    +    if (!getStore().isConnected()) {
    +      getStore().connect();
    +    }
    +    getStore().put(checkpointKey, operator);
    +  }
    +
    +  /**
    +   * Retrieves the operator object for given operator & window from configured
    +   * store
    +   * 
    +   * @param operatorId
    +   *          of operator
    +   * @param windowId
    +   *          window id of operator to checkpoint
    +   */
    +  @Override
    +  public Object load(int operatorId, long windowId)
    +  {
    +    Object obj = null;
    +    try {
    +      obj = retrieve(generateKey(operatorId, windowId));
    +      logger.info("retrieved object from store  key {} region {} ", generateKey(operatorId, windowId), applicationId);
    +    } catch (Throwable t) {
    +      logger.info("while loading {} {}", operatorId, windowId, t);
    +      DTThrowable.rethrow(t);
    +    }
    +
    +    return obj;
    +  }
    +
    +  private synchronized Object retrieve(String checkpointKey) throws IOException
    +  {
    +    if (!getStore().isConnected()) {
    +      getStore().connect();
    +    }
    +
    +    return getStore().get(checkpointKey);
    +  }
    +
    +  /**
    +   * Removes stored operator object for given operatorId & windowId from store
    +   * 
    +   */
    +  @Override
    +  public void delete(int operatorId, long windowId) throws IOException
    +  {
    +
    +    if (!getStore().isConnected()) {
    +      getStore().connect();
    +    }
    +
    +    try {
    +      store.remove(generateKey(operatorId, windowId));
    +      logger.info("deleted object from store key {} region {}", generateKey(operatorId, windowId));
    +    } catch (Throwable t) {
    --- End diff --
    
    Why catch Throwable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r47787258
  
    --- Diff: library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java ---
    @@ -0,0 +1,234 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.util;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.List;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +/**
    + * Abstract implementation of {@link AppIdAwareStorageAgent} which can be
    + * configured be KeyValue store witch implementation of {@link KeyValueStore}
    + * 
    + * NOTE - this should be picked from APEX-CORE once below feature is release
    + * https://issues.apache.org/jira/browse/APEXCORE-283
    + * 
    + * @param <S>
    + *          Store implementation
    + */
    +public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValueStore>
    +    implements AppIdAwareStorageAgent, Serializable
    +{
    +
    +  protected S store;
    +  protected String applicationId;
    --- End diff --
    
    should applicationId be final?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r47786887
  
    --- Diff: library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java ---
    @@ -0,0 +1,234 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.util;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.List;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +/**
    + * Abstract implementation of {@link AppIdAwareStorageAgent} which can be
    + * configured be KeyValue store witch implementation of {@link KeyValueStore}
    + * 
    + * NOTE - this should be picked from APEX-CORE once below feature is release
    + * https://issues.apache.org/jira/browse/APEXCORE-283
    + * 
    + * @param <S>
    + *          Store implementation
    + */
    +public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValueStore>
    +    implements AppIdAwareStorageAgent, Serializable
    +{
    +
    +  protected S store;
    +  protected String applicationId;
    +  public static final String CHECKPOINT_KEY_SEPARATOR = "-";
    +
    +  /**
    +   * Gets the store
    +   *
    +   * @return the store
    +   */
    +  public S getStore()
    +  {
    +    return store;
    +  }
    +
    +  /**
    +   * Sets the store
    +   *
    +   * @param store
    +   */
    +  public void setStore(S store)
    +  {
    +    this.store = store;
    +  }
    +
    +  /**
    +   * Return yarn application id of running application
    +   * 
    +   * @return
    +   */
    +  public String getApplicationId()
    +  {
    +    return applicationId;
    +  }
    +
    +  /**
    +   * Set yarn application id
    +   * 
    +   * @param applicationId
    +   */
    +  public void setApplicationId(String applicationId)
    +  {
    +    this.applicationId = applicationId;
    +  }
    +
    +  /**
    +   * Generates key from operator id and window id to store unique operator
    +   * checkpoints
    +   * 
    +   * @param operatorId
    +   * @param windowId
    +   * @return unique key for store
    +   */
    +  public static String generateKey(int operatorId, long windowId)
    +  {
    +    return String.valueOf(operatorId) + CHECKPOINT_KEY_SEPARATOR + String.valueOf(windowId);
    +  }
    +
    +  /**
    +   * Stores the given operator object in configured store
    +   * 
    +   * @param object
    +   *          Operator object to store
    +   * @param operatorId
    +   *          of operator
    +   * @param windowId
    +   *          window id of operator to checkpoint
    +   * 
    +   */
    +  @Override
    +  public void save(Object object, int operatorId, long windowId) throws IOException
    +  {
    +
    +    try {
    +      store(generateKey(operatorId, windowId), object);
    +      logger.info("saved check point object key {} region {}", generateKey(operatorId, windowId), applicationId);
    +    } catch (Throwable t) {
    --- End diff --
    
    why catch Throwable? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by ashishtadose <gi...@git.apache.org>.
Github user ashishtadose commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r47956750
  
    --- Diff: library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java ---
    @@ -0,0 +1,234 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.util;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.List;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +/**
    + * Abstract implementation of {@link AppIdAwareStorageAgent} which can be
    + * configured be KeyValue store witch implementation of {@link KeyValueStore}
    + * 
    + * NOTE - this should be picked from APEX-CORE once below feature is release
    + * https://issues.apache.org/jira/browse/APEXCORE-283
    + * 
    + * @param <S>
    + *          Store implementation
    + */
    +public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValueStore>
    +    implements AppIdAwareStorageAgent, Serializable
    +{
    +
    +  protected S store;
    +  protected String applicationId;
    +  public static final String CHECKPOINT_KEY_SEPARATOR = "-";
    +
    +  /**
    +   * Gets the store
    +   *
    +   * @return the store
    +   */
    +  public S getStore()
    +  {
    +    return store;
    +  }
    +
    +  /**
    +   * Sets the store
    +   *
    +   * @param store
    +   */
    +  public void setStore(S store)
    +  {
    +    this.store = store;
    +  }
    +
    +  /**
    +   * Return yarn application id of running application
    +   * 
    +   * @return
    +   */
    +  public String getApplicationId()
    +  {
    +    return applicationId;
    +  }
    +
    +  /**
    +   * Set yarn application id
    +   * 
    +   * @param applicationId
    +   */
    +  public void setApplicationId(String applicationId)
    +  {
    +    this.applicationId = applicationId;
    +  }
    +
    +  /**
    +   * Generates key from operator id and window id to store unique operator
    +   * checkpoints
    +   * 
    +   * @param operatorId
    +   * @param windowId
    +   * @return unique key for store
    +   */
    +  public static String generateKey(int operatorId, long windowId)
    +  {
    +    return String.valueOf(operatorId) + CHECKPOINT_KEY_SEPARATOR + String.valueOf(windowId);
    +  }
    +
    +  /**
    +   * Stores the given operator object in configured store
    +   * 
    +   * @param object
    +   *          Operator object to store
    +   * @param operatorId
    +   *          of operator
    +   * @param windowId
    +   *          window id of operator to checkpoint
    +   * 
    +   */
    +  @Override
    +  public void save(Object object, int operatorId, long windowId) throws IOException
    +  {
    +
    +    try {
    +      store(generateKey(operatorId, windowId), object);
    +      logger.info("saved check point object key {} region {}", generateKey(operatorId, windowId), applicationId);
    +    } catch (Throwable t) {
    --- End diff --
    
    Initial though was, since this is abstract implementation and to ensure that any errors or exceptions thrown by underlying store should not cause disruption to application.
    
    I can change this to catch to run time exception instead. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r48813326
  
    --- Diff: library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java ---
    @@ -0,0 +1,236 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.util;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.List;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.datatorrent.api.Attribute.AttributeMap;
    +import com.datatorrent.lib.util.StorageAgent.ApplicationAwareStorageAgent;
    +import com.datatorrent.stram.plan.logical.LogicalPlan;
    +
    +/**
    + * Abstract implementation of {@link AppIdAwareStorageAgent} which can be
    + * configured be KeyValue store witch implementation of {@link KeyValueStore}
    + * 
    + * NOTE - this should be picked from APEX-CORE once below feature is release
    + * https://issues.apache.org/jira/browse/APEXCORE-283
    + * 
    + * @param <S>
    + *          Store implementation
    + */
    +public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValueStore>
    +    implements ApplicationAwareStorageAgent, Serializable
    +{
    +
    +  protected S store;
    +  protected String applicationId;
    +  public static final String CHECKPOINT_KEY_SEPARATOR = "-";
    +
    +  /**
    +   * Gets the store
    +   *
    +   * @return the store
    +   */
    +  public S getStore()
    +  {
    +    return store;
    +  }
    +
    +  /**
    +   * Sets the store
    +   *
    +   * @param store
    +   */
    +  public void setStore(S store)
    +  {
    +    this.store = store;
    +  }
    +
    +  /**
    +   * Return yarn application id of running application
    +   * 
    +   * @return
    +   */
    +  public String getApplicationId()
    +  {
    +    return applicationId;
    +  }
    +
    +  /**
    +   * Set yarn application id
    +   * 
    +   * @param applicationId
    +   */
    +  public void setApplicationId(String applicationId)
    +  {
    +    this.applicationId = applicationId;
    +  }
    +
    +  /**
    +   * Generates key from operator id and window id to store unique operator
    +   * checkpoints
    +   * 
    +   * @param operatorId
    +   * @param windowId
    +   * @return unique key for store
    +   */
    +  public static String generateKey(int operatorId, long windowId)
    +  {
    +    return String.valueOf(operatorId) + CHECKPOINT_KEY_SEPARATOR + String.valueOf(windowId);
    +  }
    +
    +  /**
    +   * Stores the given operator object in configured store
    +   * 
    +   * @param object
    +   *          Operator object to store
    +   * @param operatorId
    +   *          of operator
    +   * @param windowId
    +   *          window id of operator to checkpoint
    +   * 
    +   */
    +  @Override
    +  public void save(Object object, int operatorId, long windowId) throws IOException
    +  {
    +
    +    try {
    +      store(generateKey(operatorId, windowId), object);
    +      logger.info("saved check point object key {} region {}", generateKey(operatorId, windowId), applicationId);
    +    } catch (Exception ex) {
    +      logger.info("while saving {} {}", operatorId, windowId, ex);
    +      throw new RuntimeException(ex);
    +    }
    +  }
    +
    +  private synchronized void store(String checkpointKey, Object operator) throws IOException
    +  {
    +    if (!getStore().isConnected()) {
    +      getStore().connect();
    +    }
    +    getStore().put(checkpointKey, operator);
    +  }
    +
    +  /**
    +   * Retrieves the operator object for given operator & window from configured
    +   * store
    +   * 
    +   * @param operatorId
    +   *          of operator
    +   * @param windowId
    +   *          window id of operator to checkpoint
    +   */
    +  @Override
    +  public Object load(int operatorId, long windowId)
    +  {
    +    Object obj = null;
    +    try {
    +      obj = retrieve(generateKey(operatorId, windowId));
    +      logger.info("retrieved object from store  key {} region {} ", generateKey(operatorId, windowId), applicationId);
    --- End diff --
    
    change this to debug


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...

Posted by ashishtadose <gi...@git.apache.org>.
Github user ashishtadose commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r47955679
  
    --- Diff: library/src/main/java/com/datatorrent/lib/util/StorageAgentKeyValueStore.java ---
    @@ -0,0 +1,88 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.util;
    +
    +import java.io.IOException;
    +import java.util.List;
    +
    +/**
    + * Interface for KeyValue store
    + * 
    + */
    +public interface StorageAgentKeyValueStore
    --- End diff --
    
    com.datatorrent.lib.db.KeyValueStore it contains additional methods such as getAll & putAll which are not relevant here also this requires one additional method which is not present there.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---