You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/02/15 10:56:18 UTC

[jira] [Commented] (APEXMALHAR-1985) Cassandra Input Oeprator: startRow set incorrectly

    [ https://issues.apache.org/jira/browse/APEXMALHAR-1985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15147143#comment-15147143 ] 

ASF GitHub Bot commented on APEXMALHAR-1985:
--------------------------------------------

Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/182#discussion_r52878459
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java ---
    @@ -359,24 +373,43 @@ public void emitTuples()
       {
         super.emitTuples();
         if (lastRowInBatch != null) {
    -      switch (primaryKeyColumnType.getName()) {
    -        case INT:
    -          startRow = lastRowInBatch.getInt(primaryKeyColumn);
    -          break;
    -        case COUNTER:
    -          startRow = lastRowInBatch.getLong(primaryKeyColumn);
    -          break;
    -        case FLOAT:
    -          startRow = lastRowInBatch.getFloat(primaryKeyColumn);
    -          break;
    -        case DOUBLE:
    -          startRow = lastRowInBatch.getDouble(primaryKeyColumn);
    -          break;
    -        default:
    -          throw new RuntimeException("unsupported data type " + primaryKeyColumnType.getName());
    -      }
    +      startRowToken = getPrimaryKeyToken(primaryKeyColumnType.getName());
         }
    +  }
     
    +  private Long getPrimaryKeyToken(DataType.Name primaryKeyDataType)
    +  {
    +    Object keyValue;
    +    switch (primaryKeyDataType) {
    +    case UUID:
    +      keyValue = lastRowInBatch.getUUID(primaryKeyColumn);
    +      break;
    +    case INT:
    +      keyValue = lastRowInBatch.getInt(primaryKeyColumn);
    +      break;
    +    case COUNTER:
    +      keyValue = lastRowInBatch.getLong(primaryKeyColumn);
    +      break;
    +    case FLOAT:
    +      keyValue = lastRowInBatch.getFloat(primaryKeyColumn);
    +      break;
    +    case DOUBLE:
    +      keyValue = lastRowInBatch.getDouble(primaryKeyColumn);
    +      break;
    +    default:
    +      throw new RuntimeException("unsupported data type " + primaryKeyColumnType.getName());
    +    }
    +    return fetchKeyTokenFromDB(keyValue);
    +  }
    +
    +  private Long fetchKeyTokenFromDB(Object keyValue)
    +  {
    +    PreparedStatement statement = store.getSession().prepare(TOKEN_QUERY);
    +    BoundStatement boundStatement = new BoundStatement(statement);
    --- End diff --
    
    May be even this?


> Cassandra Input Oeprator: startRow set incorrectly
> --------------------------------------------------
>
>                 Key: APEXMALHAR-1985
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-1985
>             Project: Apache Apex Malhar
>          Issue Type: Bug
>            Reporter: Priyanka Gugale
>            Assignee: Priyanka Gugale
>
> The CassandraInputOperator scans the table starting with StratRow, at end of each query, it updates start row to the last scanned row in current query.
> The query format is: select * from %t where token(%p) > %v limit %l
> Where %v is set to primary column value, but instead it should be set to token value of start row i.e. 
> %v = token(startRow).
> Reference: http://mail-archives.apache.org/mod_mbox/cassandra-user/201308.mbox/%3CCA+VSrLpeZqSv_85YqqR-99F-q2PMv2LAJ825WZrL7MEAf_jxFw@mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)