You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by DT-Priyanka <gi...@git.apache.org> on 2016/04/01 11:21:39 UTC

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1966: Update casand...

GitHub user DT-Priyanka opened a pull request:

    https://github.com/apache/incubator-apex-malhar/pull/227

    APEXMALHAR-1966: Update casandra output opreator

    Changes:
    1. Accept user queries (can be insert/update/delete)
    2. Use default field info, as per pojo fields if fieldInfos is not set.
    3. Add metrics.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/DT-Priyanka/incubator-apex-malhar APEXMALHAR-1966-cassandra-output

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-apex-malhar/pull/227.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #227
    
----
commit 2ec51896d9b9aeaadf0db9f2e0a9dca0f5fdcb5a
Author: Priyanka Gugale <pr...@datatorrent.com>
Date:   2016-04-01T09:20:21Z

    APEXMALHAR-1966: Update casandra output opreator

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1966: Update casand...

Posted by bhupeshchawda <gi...@git.apache.org>.
Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/227#discussion_r58710287
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java ---
    @@ -50,17 +49,21 @@
      * @since 2.1.0
      */
     @Evolving
    -public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionableOutputOperatorPS<Object> implements Operator.ActivationListener<Context.OperatorContext>
    +public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionableOutputOperatorPS<Object>
     {
    -  @NotNull
       private List<FieldInfo> fieldInfos;
    -  @NotNull
       private String tablename;
    +  private String query;
     
       protected final transient ArrayList<DataType> columnDataTypes;
       protected final transient ArrayList<Object> getters;
       protected transient Class<?> pojoClass;
     
    +  @AutoMetric
    +  private long recordsProcessed;
    --- End diff --
    
    Can you rename it to something else? This seems like the total records processed, not records processed successfully.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1966: Update casand...

Posted by bhupeshchawda <gi...@git.apache.org>.
Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/227#discussion_r58711090
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java ---
    @@ -172,25 +197,32 @@ public void deactivate()
       @Override
       protected PreparedStatement getUpdateCommand()
    --- End diff --
    
    Please add javadocs stating the behaviour in case query is null and if not null..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1966: Update casand...

Posted by bhupeshchawda <gi...@git.apache.org>.
Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/227#discussion_r58709506
  
    --- Diff: contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java ---
    @@ -301,6 +324,59 @@ public void testCassandraOutputOperator()
         outputOperator.getEventsInStore();
       }
     
    +  @Test
    +  public void testupdateQueryWithParameters()
    +  {
    +    TestOutputOperator outputOperator = setupForOutputOperatorTest();
    +    outputOperator.input.setup(tpc);
    +    outputOperator.setup(context);
    +    outputOperator.activate(context);
    +
    +    UUID id = UUID.randomUUID();
    +    TestPojo testPojo = new TestPojo(id, 20, "Laura", true, 10, 2.0, new HashSet<Integer>(), new ArrayList<Integer>(), null, new Date(System.currentTimeMillis()));
    +    // insert record
    +    outputOperator.beginWindow(0);
    +    outputOperator.input.process(testPojo);
    +    outputOperator.endWindow();
    +    Assert.assertEquals("rows in db", 1, outputOperator.getNumOfEventsInStore());
    +
    +    // update record
    --- End diff --
    
    Please have this as a separate test. Don't reset the operator..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1966: Update casand...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/227#discussion_r58836522
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java ---
    @@ -50,17 +49,21 @@
      * @since 2.1.0
      */
     @Evolving
    -public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionableOutputOperatorPS<Object> implements Operator.ActivationListener<Context.OperatorContext>
    +public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionableOutputOperatorPS<Object>
     {
    -  @NotNull
       private List<FieldInfo> fieldInfos;
    -  @NotNull
       private String tablename;
    +  private String query;
     
       protected final transient ArrayList<DataType> columnDataTypes;
       protected final transient ArrayList<Object> getters;
       protected transient Class<?> pojoClass;
     
    +  @AutoMetric
    +  private long recordsProcessed;
    --- End diff --
    
    done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1966: Update casand...

Posted by bhupeshchawda <gi...@git.apache.org>.
Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/227#discussion_r58707219
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java ---
    @@ -260,6 +292,17 @@ protected Statement setStatementParameters(PreparedStatement updateCommand, Obje
         return boundStmnt;
       }
     
    +  @Override
    +  public void processTuple(Object tuple)
    +  {
    +    try {
    +      super.processTuple(tuple);
    +      recordsProcessed++;
    +    } catch (RuntimeException e) {
    +      //super.error.emit(tuple);
    --- End diff --
    
    Also, please log the exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1966: Update casand...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/227#discussion_r58838149
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java ---
    @@ -260,6 +292,17 @@ protected Statement setStatementParameters(PreparedStatement updateCommand, Obje
         return boundStmnt;
       }
     
    +  @Override
    +  public void processTuple(Object tuple)
    +  {
    +    try {
    +      super.processTuple(tuple);
    +      recordsProcessed++;
    +    } catch (RuntimeException e) {
    +      //super.error.emit(tuple);
    --- End diff --
    
    copying relevant code from #215 to remove dependency.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1966: Update casand...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka closed the pull request at:

    https://github.com/apache/incubator-apex-malhar/pull/227


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1966: Update casand...

Posted by bhupeshchawda <gi...@git.apache.org>.
Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/227#discussion_r58705296
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java ---
    @@ -172,25 +197,32 @@ public void deactivate()
       @Override
       protected PreparedStatement getUpdateCommand()
       {
    +    PreparedStatement statement;
    +    if (query == null) {
    +      statement = prepareStatementFromFieldsAndTableName();
    +    } else {
    +      statement = store.getSession().prepare(query);
    +    }
    +    LOG.debug("Statement is: " + statement.getQueryString());
    +    return statement;
    +  }
    +
    +  private PreparedStatement prepareStatementFromFieldsAndTableName()
    +  {
         StringBuilder queryfields = new StringBuilder();
         StringBuilder values = new StringBuilder();
    -    for (FieldInfo fieldInfo: fieldInfos) {
    +    for (FieldInfo fieldInfo : fieldInfos) {
           if (queryfields.length() == 0) {
             queryfields.append(fieldInfo.getColumnName());
             values.append("?");
    -      }
    -      else {
    +      } else {
             queryfields.append(",").append(fieldInfo.getColumnName());
             values.append(",").append("?");
           }
         }
    -    String statement
    -            = "INSERT INTO " + store.keyspace + "."
    -            + tablename
    -            + " (" + queryfields.toString() + ") "
    -            + "VALUES (" + values.toString() + ");";
    -    LOG.debug("statement is {}", statement);
    +    String statement = "INSERT INTO " + store.keyspace + "." + tablename + " (" + queryfields.toString() + ") " + "VALUES (" + values.toString() + ");";
    --- End diff --
    
    have a check for null tablename, since ```@NotNull``` is now removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1966: Update casand...

Posted by bhupeshchawda <gi...@git.apache.org>.
Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/227#discussion_r58710629
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java ---
    @@ -102,20 +92,31 @@ public CassandraPOJOOutputOperator()
       }
     
       @Override
    +  public void beginWindow(long windowId)
    +  {
    +    super.beginWindow(windowId);
    +    recordsProcessed = 0;
    +    errorRecords = 0;
    +  }
    +
    +  @Override
       public void activate(Context.OperatorContext context)
       {
    +    // clear if it had anything
    +    columnDataTypes.clear();
    --- End diff --
    
    why is this necessary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1966: Update casand...

Posted by bhupeshchawda <gi...@git.apache.org>.
Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/227#discussion_r58703820
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java ---
    @@ -162,6 +163,30 @@ public void activate(Context.OperatorContext context)
           }
           getters.add(getter);
         }
    +    super.activate(context);
    +  }
    +
    +  private void populateFieldInfosFromPojo(ColumnDefinitions rsMetaData)
    +  {
    +    fieldInfos = Lists.newArrayList();
    +    Field[] fields = pojoClass.getDeclaredFields();
    +    for (int i = 0; i < rsMetaData.size(); i++) {
    +      String columnName = rsMetaData.getName(i);
    +      String pojoField = getMatchingField(fields, columnName);
    +      if (pojoField != null && pojoField.length() != 0) {
    --- End diff --
    
    what happens if pojoField is null? Please check if this will cause problems when no matching field is found.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1966: Update casand...

Posted by bhupeshchawda <gi...@git.apache.org>.
Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/227#discussion_r58708886
  
    --- Diff: contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java ---
    @@ -301,6 +324,59 @@ public void testCassandraOutputOperator()
         outputOperator.getEventsInStore();
       }
     
    +  @Test
    +  public void testupdateQueryWithParameters()
    +  {
    +    TestOutputOperator outputOperator = setupForOutputOperatorTest();
    +    outputOperator.input.setup(tpc);
    --- End diff --
    
    Setup port after operator


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1966: Update casand...

Posted by bhupeshchawda <gi...@git.apache.org>.
Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/227#discussion_r58707688
  
    --- Diff: contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java ---
    @@ -269,15 +269,38 @@ public void testCassandraOutputOperator()
         fieldInfos.add(new FieldInfo("set1", "set1", null));
         fieldInfos.add(new FieldInfo("test", "test", null));
     
    -    outputOperator.setStore(transactionalStore);
         outputOperator.setFieldInfos(fieldInfos);
    +    outputOperator.input.setup(tpc);
    --- End diff --
    
    I think the setup of port is after the setup of operator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1966: Update casand...

Posted by bhupeshchawda <gi...@git.apache.org>.
Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/227#discussion_r58708006
  
    --- Diff: contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java ---
    @@ -269,15 +269,38 @@ public void testCassandraOutputOperator()
         fieldInfos.add(new FieldInfo("set1", "set1", null));
         fieldInfos.add(new FieldInfo("test", "test", null));
     
    -    outputOperator.setStore(transactionalStore);
         outputOperator.setFieldInfos(fieldInfos);
    +    outputOperator.input.setup(tpc);
         outputOperator.setup(context);
    +    outputOperator.activate(context);
     
    -    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
    -    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPojo.class);
    -    TestPortContext tpc = new TestPortContext(portAttributes);
    +    List<TestPojo> events = Lists.newArrayList();
    +    for (int i = 0; i < 3; i++) {
    +      Set<Integer> set = new HashSet<Integer>();
    +      set.add(i);
    +      List<Integer> list = new ArrayList<Integer>();
    +      list.add(i);
    +      Map<String, Integer> map = new HashMap<String, Integer>();
    +      map.put("key" + i, i);
    +      events.add(new TestPojo(UUID.randomUUID(), i, "abclast", true, i, 2.0, set, list, map, new Date(System.currentTimeMillis())));
    +    }
    +
    +    outputOperator.beginWindow(0);
    +    for (TestPojo event : events) {
    +      outputOperator.input.process(event);
    +    }
    +    outputOperator.endWindow();
    +
    +    Assert.assertEquals("rows in db", 3, outputOperator.getNumOfEventsInStore());
    +    outputOperator.getEventsInStore();
    +  }
     
    +  @Test
    +  public void testPopulateFieldInfo()
    +  {
    +    TestOutputOperator outputOperator = setupForOutputOperatorTest();
         outputOperator.input.setup(tpc);
    +    outputOperator.setup(context);
    --- End diff --
    
    Check order of setup for input port and operator


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1966: Update casand...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/227#discussion_r58835817
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java ---
    @@ -260,6 +292,17 @@ protected Statement setStatementParameters(PreparedStatement updateCommand, Obje
         return boundStmnt;
       }
     
    +  @Override
    +  public void processTuple(Object tuple)
    +  {
    +    try {
    +      super.processTuple(tuple);
    +      recordsProcessed++;
    +    } catch (RuntimeException e) {
    +      //super.error.emit(tuple);
    --- End diff --
    
    In #215 you have added error port to, AbstractTransactionableStoreOutputOperator operator which is parent to this operator. I will update my pull request once I rebase with latest changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1966: Update casand...

Posted by bhupeshchawda <gi...@git.apache.org>.
Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/227#discussion_r58705647
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java ---
    @@ -260,6 +292,17 @@ protected Statement setStatementParameters(PreparedStatement updateCommand, Obje
         return boundStmnt;
       }
     
    +  @Override
    +  public void processTuple(Object tuple)
    +  {
    +    try {
    +      super.processTuple(tuple);
    +      recordsProcessed++;
    +    } catch (RuntimeException e) {
    +      //super.error.emit(tuple);
    --- End diff --
    
    Not emitting on error port?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1966: Update casand...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/227#discussion_r58837320
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java ---
    @@ -162,6 +163,30 @@ public void activate(Context.OperatorContext context)
           }
           getters.add(getter);
         }
    +    super.activate(context);
    +  }
    +
    +  private void populateFieldInfosFromPojo(ColumnDefinitions rsMetaData)
    +  {
    +    fieldInfos = Lists.newArrayList();
    +    Field[] fields = pojoClass.getDeclaredFields();
    +    for (int i = 0; i < rsMetaData.size(); i++) {
    +      String columnName = rsMetaData.getName(i);
    +      String pojoField = getMatchingField(fields, columnName);
    +      if (pojoField != null && pojoField.length() != 0) {
    --- End diff --
    
    That field will be ignored (i.e. won't be added to DB). I will add a error message for same, exception doesn't look right here, whats your opinion?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1966: Update casand...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/227#discussion_r58836661
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java ---
    @@ -102,20 +92,31 @@ public CassandraPOJOOutputOperator()
       }
     
       @Override
    +  public void beginWindow(long windowId)
    +  {
    +    super.beginWindow(windowId);
    +    recordsProcessed = 0;
    +    errorRecords = 0;
    +  }
    +
    +  @Override
       public void activate(Context.OperatorContext context)
       {
    +    // clear if it had anything
    +    columnDataTypes.clear();
    --- End diff --
    
    It was required as in same test I am first running insert query and then update, let me see if I can refactor that, anyone in product we won't support if a query and fieldInfo changes on fly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---