You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by DT-Priyanka <gi...@git.apache.org> on 2016/02/10 10:16:11 UTC

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1988: Updating cass...

GitHub user DT-Priyanka opened a pull request:

    https://github.com/apache/incubator-apex-malhar/pull/186

    APEXMALHAR-1988: Updating cassandra batch fetch logic to use Cassandra Paging feature

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/DT-Priyanka/incubator-apex-malhar APEXMALHAR-1988-cassandra-input

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-apex-malhar/pull/186.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #186
    
----
commit 59e9e443e3dde6330f320049e33c357235ff65dc
Author: Priyanka Gugale <pr...@datatorrent.com>
Date:   2016-02-10T09:15:03Z

    APEXMALHAR-1988: Updating cassandra batch fetch logic to use Cassandra Paging feature

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1988: Updating cass...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/186#discussion_r63607742
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java ---
    @@ -45,7 +47,8 @@
     public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInputOperator<T, CassandraStore> {
     
       private static final Logger logger = LoggerFactory.getLogger(AbstractCassandraInputOperator.class);
    -
    +  private PagingState nextPageState;
    +  private int fetchSize;
       int waitForDataTimeout = 100;
       @AutoMetric
       protected long tuplesRead;
    --- End diff --
    
    I will remove this for now, but this needs some consideration. Will reintroduce in other PR if required.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1988: Updating cass...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/186#discussion_r62766002
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java ---
    @@ -45,7 +47,8 @@
     public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInputOperator<T, CassandraStore> {
     
       private static final Logger logger = LoggerFactory.getLogger(AbstractCassandraInputOperator.class);
    -
    +  private PagingState nextPageState;
    +  private int fetchSize;
       int waitForDataTimeout = 100;
       @AutoMetric
       protected long tuplesRead;
    --- End diff --
    
    One of the usecase I have now is show right count on UI. There could be other usecases where people need the right count of emitted tuples.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1988: Updating cass...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/186#discussion_r62567410
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java ---
    @@ -112,8 +115,15 @@ public void emitTuples()
         String query = queryToRetrieveData();
         logger.debug("select statement: {}", query);
     
    +    SimpleStatement stmt = new SimpleStatement(query);
    +    stmt.setFetchSize(fetchSize);
    --- End diff --
    
    Can this not be done once and not every time ```emitTuples()``` is invoked


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1988: Updating cass...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-apex-malhar/pull/186


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1988: Updating cass...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/186#discussion_r62566940
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java ---
    @@ -45,7 +47,8 @@
     public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInputOperator<T, CassandraStore> {
     
       private static final Logger logger = LoggerFactory.getLogger(AbstractCassandraInputOperator.class);
    -
    +  private PagingState nextPageState;
    +  private int fetchSize;
       int waitForDataTimeout = 100;
       @AutoMetric
       protected long tuplesRead;
    --- End diff --
    
    Why an additional automatic added here? 
    Isn't this part of operator stats in form of tuples processed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: [review only] APEXMALHAR-1988:...

Posted by bhupeshchawda <gi...@git.apache.org>.
Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/186#discussion_r55980645
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java ---
    @@ -134,7 +119,7 @@ public void setStartRow(Number startRow)
       /*
        * Parameterized query with parameters such as %t for table name , %p for primary key, %s for start value and %l for limit.
        * Example of retrieveQuery:
    -   * select * from %t where token(%p) > %s limit %l;
    +   * select * from %t where token(%p) > %s;
    --- End diff --
    
    Also remove "%l for limit" from javadocs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: [review only] APEXMALHAR-1988:...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/186#discussion_r55993675
  
    --- Diff: contrib/pom.xml ---
    @@ -477,10 +477,17 @@
         <dependency>
           <groupId>com.datastax.cassandra</groupId>
           <artifactId>cassandra-driver-core</artifactId>
    -      <version>2.0.2</version>
    +      <version>2.1.8</version>
           <optional>true</optional>
         </dependency>
         <dependency>
    +	  <groupId>com.google.guava</groupId>
    --- End diff --
    
    I will rebase it, this is required in other PR for protocol version changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1988: Updating cass...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/186#discussion_r62769280
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java ---
    @@ -45,7 +47,8 @@
     public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInputOperator<T, CassandraStore> {
     
       private static final Logger logger = LoggerFactory.getLogger(AbstractCassandraInputOperator.class);
    -
    +  private PagingState nextPageState;
    +  private int fetchSize;
       int waitForDataTimeout = 100;
       @AutoMetric
       protected long tuplesRead;
    --- End diff --
    
    @DT-Priyanka 
    If tuples count is not right, isn't that the problem with all the operators? 
    In that case why did we decide to add this user metric to just this operator?
    
    I asked this question because I want to understand what was use case which made it necessary to add this metric just to cassandra input operator. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1988: Updating cass...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/186#discussion_r62757371
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java ---
    @@ -45,7 +47,8 @@
     public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInputOperator<T, CassandraStore> {
     
       private static final Logger logger = LoggerFactory.getLogger(AbstractCassandraInputOperator.class);
    -
    +  private PagingState nextPageState;
    +  private int fetchSize;
       int waitForDataTimeout = 100;
       @AutoMetric
       protected long tuplesRead;
    --- End diff --
    
    The reason we decided to add it was, if a operator dies, the count provided as tuples count may not be right. Do we have any better solution to address this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: [review only] APEXMALHAR-1988:...

Posted by bhupeshchawda <gi...@git.apache.org>.
Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/186#discussion_r55982746
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java ---
    @@ -126,4 +140,17 @@ protected void emit(T tuple)
       {
         outputPort.emit(tuple);
       }
    +
    +  /*
    +   * Number of records to be fetched at one time from cassandra table.
    +   */
    +  public int getLimit()
    +  {
    +    return limit;
    +  }
    +
    +  public void setLimit(int limit)
    --- End diff --
    
    Add javadocs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: [review only] APEXMALHAR-1988:...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/186#discussion_r55993453
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java ---
    @@ -104,19 +108,29 @@ public void emitTuples()
         String query = queryToRetrieveData();
         logger.debug("select statement: {}", query);
     
    +    SimpleStatement stmt = new SimpleStatement(query);
    +    stmt.setFetchSize(limit);
         try {
    -      ResultSet result = store.getSession().execute(query);
    +      if (nextPageState != null) {
    +        stmt.setPagingState(nextPageState);
    +      }
    +      ResultSet result = store.getSession().execute(stmt);
    +      nextPageState = result.getExecutionInfo().getPagingState();
    +
    +      int remaining = result.getAvailableWithoutFetching();
           if (!result.isExhausted()) {
             for (Row row : result) {
               T tuple = getTuple(row);
               emit(tuple);
    +          if (--remaining == 0) {
    --- End diff --
    
    removing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1988: Updating cass...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka closed the pull request at:

    https://github.com/apache/incubator-apex-malhar/pull/186


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1988: Updating cass...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/186#discussion_r62565985
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java ---
    @@ -61,6 +61,7 @@
     @Evolving
     public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<Object> implements Operator.ActivationListener<OperatorContext>
     {
    +  private String TOKEN_QUERY;
    --- End diff --
    
    Please follow Java naming conventions. This field not a constant so it should be in camel case


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1988: Updating cass...

Posted by DT-Priyanka <gi...@git.apache.org>.
GitHub user DT-Priyanka reopened a pull request:

    https://github.com/apache/incubator-apex-malhar/pull/186

    APEXMALHAR-1988: Updating cassandra batch fetch logic to use Cassandra Paging feature

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/DT-Priyanka/incubator-apex-malhar APEXMALHAR-1988-cassandra-input

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-apex-malhar/pull/186.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #186
    
----
commit f077007a7546e0c15baac387a8555f4ae6737c3c
Author: Priyanka Gugale <pr...@datatorrent.com>
Date:   2016-02-10T09:15:03Z

    APEXMALHAR-1988: Updating cassandra batch fetch logic to use Cassandra Paging feature

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: [review only] APEXMALHAR-1988:...

Posted by bhupeshchawda <gi...@git.apache.org>.
Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/186#discussion_r55979742
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java ---
    @@ -104,19 +108,29 @@ public void emitTuples()
         String query = queryToRetrieveData();
         logger.debug("select statement: {}", query);
     
    +    SimpleStatement stmt = new SimpleStatement(query);
    +    stmt.setFetchSize(limit);
         try {
    -      ResultSet result = store.getSession().execute(query);
    +      if (nextPageState != null) {
    +        stmt.setPagingState(nextPageState);
    +      }
    +      ResultSet result = store.getSession().execute(stmt);
    +      nextPageState = result.getExecutionInfo().getPagingState();
    +
    +      int remaining = result.getAvailableWithoutFetching();
           if (!result.isExhausted()) {
             for (Row row : result) {
               T tuple = getTuple(row);
               emit(tuple);
    +          if (--remaining == 0) {
    --- End diff --
    
    Is this if() needed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1988: Updating cass...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/186#discussion_r62758965
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java ---
    @@ -45,7 +47,8 @@
     public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInputOperator<T, CassandraStore> {
     
       private static final Logger logger = LoggerFactory.getLogger(AbstractCassandraInputOperator.class);
    -
    +  private PagingState nextPageState;
    +  private int fetchSize;
       int waitForDataTimeout = 100;
       @AutoMetric
       protected long tuplesRead;
    --- End diff --
    
    @DT-Priyanka what is the use case for having a tuple count metric?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1988: Updating cass...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/186#discussion_r62758000
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java ---
    @@ -112,8 +115,15 @@ public void emitTuples()
         String query = queryToRetrieveData();
         logger.debug("select statement: {}", query);
     
    +    SimpleStatement stmt = new SimpleStatement(query);
    +    stmt.setFetchSize(fetchSize);
    --- End diff --
    
    The query is fetched each time "queryToRetrieveData()". In current implementation query doesn't change much. But anyone can implement that function to change query parameters in each run. In that case we need to create statement using the current query. This is abstract class so trying to make sure it helps in all known use cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: [review only] APEXMALHAR-1988:...

Posted by bhupeshchawda <gi...@git.apache.org>.
Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/186#discussion_r55979076
  
    --- Diff: contrib/pom.xml ---
    @@ -477,10 +477,17 @@
         <dependency>
           <groupId>com.datastax.cassandra</groupId>
           <artifactId>cassandra-driver-core</artifactId>
    -      <version>2.0.2</version>
    +      <version>2.1.8</version>
           <optional>true</optional>
         </dependency>
         <dependency>
    +	  <groupId>com.google.guava</groupId>
    --- End diff --
    
    Same changes as in #186 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---