You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by is...@apache.org on 2015/09/18 22:37:19 UTC

[1/2] incubator-apex-malhar git commit: Cassandra integration with Schema Support

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 f7f9085e8 -> 9194a72c3


Cassandra integration with Schema Support


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/effc8385
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/effc8385
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/effc8385

Branch: refs/heads/devel-3
Commit: effc8385af173448f0dfc9e8e2243b5fd728acef
Parents: f7f9085
Author: Chandni Singh <ch...@datatorrent.com>
Authored: Sun Aug 2 21:27:59 2015 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Fri Sep 18 13:02:44 2015 -0700

----------------------------------------------------------------------
 .../AbstractCassandraInputOperator.java         |   9 +-
 .../cassandra/CassandraPOJOInputOperator.java   | 276 +++++++++----------
 .../cassandra/CassandraPOJOOutputOperator.java  | 145 +++++-----
 .../cassandra/CassandraOperatorTest.java        | 159 +++++------
 4 files changed, 293 insertions(+), 296 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/effc8385/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
index 0c14a0e..7560768 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
@@ -99,14 +99,14 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp
   public void emitTuples()
   {
     String query = queryToRetrieveData();
-    logger.debug(String.format("select statement: %s", query));
+    logger.debug("select statement: {}", query);
 
     try {
       ResultSet result = store.getSession().execute(query);
       if (!result.isExhausted()) {
         for (Row row : result) {
           T tuple = getTuple(row);
-          outputPort.emit(tuple);
+          emit(tuple);
         }
       } else {
         // No rows available wait for some time before retrying so as to not continuously slam the database
@@ -118,4 +118,9 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp
       DTThrowable.rethrow(ex);
     }
   }
+
+  protected void emit(T tuple)
+  {
+    outputPort.emit(tuple);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/effc8385/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
index 9d8e356..13f4dd0 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
@@ -21,17 +21,23 @@ import java.util.*;
 import javax.validation.constraints.Min;
 import javax.validation.constraints.NotNull;
 
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.datastax.driver.core.ColumnDefinitions;
 import com.datastax.driver.core.DataType;
 import com.datastax.driver.core.Row;
 
-import org.apache.hadoop.classification.InterfaceStability.Evolving;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 
+import com.datatorrent.lib.util.FieldInfo;
 import com.datatorrent.lib.util.PojoUtils;
 import com.datatorrent.lib.util.PojoUtils.*;
-import com.datatorrent.api.Context.OperatorContext;
 
 /**
  * <p>
@@ -49,29 +55,38 @@ import com.datatorrent.api.Context.OperatorContext;
  * @since 3.0.0
  */
 @Evolving
-public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<Object>
+public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<Object> implements Operator.ActivationListener<OperatorContext>
 {
   @NotNull
-  private List<String> columns;
-  private final transient List<DataType> columnDataTypes;
+  private List<FieldInfo> fieldInfos;
   private Number startRow = 0;
   @NotNull
-  private List<String> expressions;
-  @NotNull
   private String tablename;
-  private final transient List<Object> setters;
   @NotNull
   private String query;
-
-  private transient Class<?> objectClass = null;
   @NotNull
-  protected String primaryKeyColumn;
-  protected transient DataType primaryKeyColumnType;
-  private transient Row lastRowInBatch;
+  private String primaryKeyColumn;
 
   @Min(1)
   private int limit = 10;
 
+  private transient DataType primaryKeyColumnType;
+  private transient Row lastRowInBatch;
+
+  protected final transient List<Object> setters;
+  protected final transient List<DataType> columnDataTypes;
+  protected transient Class<?> pojoClass;
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultOutputPort<Object> outputPort = new DefaultOutputPort<Object>()
+  {
+    @Override
+    public void setup(Context.PortContext context)
+    {
+      pojoClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+    }
+  };
+
   /*
    * Number of records to be fetched in one time from cassandra table.
    */
@@ -114,25 +129,6 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
   }
 
   /*
-   * POJO class which is generated as output from this operator.
-   * Example:
-   * public class TestPOJO{ int intfield; public int getInt(){} public void setInt(){} }
-   * outputClass = TestPOJO
-   * POJOs will be generated on fly in later implementation.
-   */
-  private String outputClass;
-
-  public String getOutputClass()
-  {
-    return outputClass;
-  }
-
-  public void setOutputClass(String outputClass)
-  {
-    this.outputClass = outputClass;
-  }
-
-  /*
    * Parameterized query with parameters such as %t for table name , %p for primary key, %s for start value and %l for limit.
    * Example of retrieveQuery:
    * select * from %t where token(%p) > %s limit %l;
@@ -147,31 +143,25 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
     this.query = query.replace("%t", tablename);
   }
 
-  /*
-   * An ArrayList of Java expressions that will yield the cassandra column value to be set in output object.
-   * Each expression corresponds to one column in the Cassandra table.
+  /**
+   * A list of {@link FieldInfo}s where each item maps a column name to a pojo field name.
    */
-  public List<String> getExpressions()
+  public List<FieldInfo> getFieldInfos()
   {
-    return expressions;
+    return fieldInfos;
   }
 
-  public void setExpressions(List<String> expressions)
-  {
-    this.expressions = expressions;
-  }
-
-  /*
-   * List of column names specified by User in the same order as expressions for the particular fields.
+  /**
+   * Sets the {@link FieldInfo}s. A {@link FieldInfo} maps a store column to a pojo field name.<br/>
+   * The value from fieldInfo.column is assigned to fieldInfo.pojoFieldExpression.
+   *
+   * @description $[].columnName name of the database column name
+   * @description $[].pojoFieldExpression pojo field name or expression
+   * @useSchema $[].pojoFieldExpression outputPort.fields[].name
    */
-  public List<String> getColumns()
+  public void setFieldInfos(List<FieldInfo> fieldInfos)
   {
-    return columns;
-  }
-
-  public void setColumns(List<String> columns)
-  {
-    this.columns = columns;
+    this.fieldInfos = fieldInfos;
   }
 
   /*
@@ -196,87 +186,72 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
   }
 
   @Override
-  public void setup(OperatorContext context)
+  public void activate(OperatorContext context)
   {
-    super.setup(context);
-    if (setters.isEmpty()) {
-      try {
-        // This code will be replaced after integration of creating POJOs on the fly utility.
-        objectClass = Class.forName(outputClass);
-      }
-      catch (ClassNotFoundException ex) {
-        throw new RuntimeException(ex);
-      }
+    com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace + "." + tablename + " LIMIT " + 1);
+    ColumnDefinitions rsMetaData = rs.getColumnDefinitions();
 
-      com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace + "." + tablename + " LIMIT " + 1);
-      ColumnDefinitions rsMetaData = rs.getColumnDefinitions();
-
-      primaryKeyColumnType = rsMetaData.getType(primaryKeyColumn);
-       if(query.contains("%p"))
-       {
-          query = query.replace("%p", primaryKeyColumn);
-       }
-       if(query.contains("%l"))
-       {
-         query = query.replace("%l", limit+"");
-       }
-
-      logger.debug("query is {}",query);
-
-      //In case columns is a subset
-      int columnSize = columns.size();
-      for (int i = 0; i < columnSize; i++) {
-        // Get the designated column's data type.
-        DataType type = rsMetaData.getType(columns.get(i));
-        columnDataTypes.add(type);
-        Object setter;
-        final String setterExpr = expressions.get(i);
-        switch (type.getName()) {
-          case ASCII:
-          case TEXT:
-          case VARCHAR:
-            setter = PojoUtils.createSetter(objectClass, setterExpr, String.class);
-            break;
-          case BOOLEAN:
-            setter = PojoUtils.createSetterBoolean(objectClass, setterExpr);
-            break;
-          case INT:
-            setter = PojoUtils.createSetterInt(objectClass, setterExpr);
-            break;
-          case BIGINT:
-          case COUNTER:
-            setter = PojoUtils.createSetterLong(objectClass, setterExpr);
-            break;
-          case FLOAT:
-            setter = PojoUtils.createSetterFloat(objectClass, setterExpr);
-            break;
-          case DOUBLE:
-            setter = PojoUtils.createSetterDouble(objectClass, setterExpr);
-            break;
-          case DECIMAL:
-            setter = PojoUtils.createSetter(objectClass, setterExpr, BigDecimal.class);
-            break;
-          case SET:
-            setter = PojoUtils.createSetter(objectClass, setterExpr, Set.class);
-            break;
-          case MAP:
-            setter = PojoUtils.createSetter(objectClass, setterExpr, Map.class);
-            break;
-          case LIST:
-            setter = PojoUtils.createSetter(objectClass, setterExpr, List.class);
-            break;
-          case TIMESTAMP:
-            setter = PojoUtils.createSetter(objectClass, setterExpr, Date.class);
-            break;
-          case UUID:
-            setter = PojoUtils.createSetter(objectClass, setterExpr, UUID.class);
-            break;
-          default:
-            setter = PojoUtils.createSetter(objectClass, setterExpr, Object.class);
-            break;
-        }
-        setters.add(setter);
+    primaryKeyColumnType = rsMetaData.getType(primaryKeyColumn);
+    if (query.contains("%p")) {
+      query = query.replace("%p", primaryKeyColumn);
+    }
+    if (query.contains("%l")) {
+      query = query.replace("%l", limit + "");
+    }
+
+    LOG.debug("query is {}", query);
+
+    for (FieldInfo fieldInfo : fieldInfos) {
+      // Get the designated column's data type.
+      DataType type = rsMetaData.getType(fieldInfo.getColumnName());
+      columnDataTypes.add(type);
+      Object setter;
+      final String setterExpr = fieldInfo.getPojoFieldExpression();
+      switch (type.getName()) {
+        case ASCII:
+        case TEXT:
+        case VARCHAR:
+          setter = PojoUtils.createSetter(pojoClass, setterExpr, String.class);
+          break;
+        case BOOLEAN:
+          setter = PojoUtils.createSetterBoolean(pojoClass, setterExpr);
+          break;
+        case INT:
+          setter = PojoUtils.createSetterInt(pojoClass, setterExpr);
+          break;
+        case BIGINT:
+        case COUNTER:
+          setter = PojoUtils.createSetterLong(pojoClass, setterExpr);
+          break;
+        case FLOAT:
+          setter = PojoUtils.createSetterFloat(pojoClass, setterExpr);
+          break;
+        case DOUBLE:
+          setter = PojoUtils.createSetterDouble(pojoClass, setterExpr);
+          break;
+        case DECIMAL:
+          setter = PojoUtils.createSetter(pojoClass, setterExpr, BigDecimal.class);
+          break;
+        case SET:
+          setter = PojoUtils.createSetter(pojoClass, setterExpr, Set.class);
+          break;
+        case MAP:
+          setter = PojoUtils.createSetter(pojoClass, setterExpr, Map.class);
+          break;
+        case LIST:
+          setter = PojoUtils.createSetter(pojoClass, setterExpr, List.class);
+          break;
+        case TIMESTAMP:
+          setter = PojoUtils.createSetter(pojoClass, setterExpr, Date.class);
+          break;
+        case UUID:
+          setter = PojoUtils.createSetter(pojoClass, setterExpr, UUID.class);
+          break;
+        default:
+          setter = PojoUtils.createSetter(pojoClass, setterExpr, Object.class);
+          break;
       }
+      setters.add(setter);
     }
   }
 
@@ -285,23 +260,19 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
   public Object getTuple(Row row)
   {
     lastRowInBatch = row;
-    Object obj = null;
-    final int size = columnDataTypes.size();
+    Object obj;
 
     try {
       // This code will be replaced after integration of creating POJOs on the fly utility.
-      obj = objectClass.newInstance();
-    }
-    catch (InstantiationException ex) {
-      throw new RuntimeException(ex);
+      obj = pojoClass.newInstance();
     }
-    catch (IllegalAccessException ex) {
+    catch (InstantiationException | IllegalAccessException ex) {
       throw new RuntimeException(ex);
     }
 
-    for (int i = 0; i < size; i++) {
+    for (int i = 0; i < columnDataTypes.size(); i++) {
       DataType type = columnDataTypes.get(i);
-      String columnName = columns.get(i);
+      String columnName = fieldInfos.get(i).getColumnName();
       switch (type.getName()) {
         case UUID:
           final UUID id = row.getUUID(columnName);
@@ -370,16 +341,10 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
   @Override
   public String queryToRetrieveData()
   {
-    String parameterizedQuery;
-    if(query.contains("%v"))
-    {
-      parameterizedQuery = query.replace("%v", startRow+"");
+    if (query.contains("%v")) {
+      return query.replace("%v", startRow + "");
     }
-    else
-    {
-      parameterizedQuery = query;
-    }
-    return parameterizedQuery;
+    return query;
   }
 
 
@@ -411,5 +376,16 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
 
   }
 
-  private static final Logger logger = LoggerFactory.getLogger(CassandraPOJOInputOperator.class);
+  @Override
+  protected void emit(Object tuple)
+  {
+    outputPort.emit(tuple);
+  }
+
+  @Override
+  public void deactivate()
+  {
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(CassandraPOJOInputOperator.class);
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/effc8385/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
index 61ef26c..bc4f97e 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
@@ -15,16 +15,6 @@
  */
 package com.datatorrent.contrib.cassandra;
 
-import com.datastax.driver.core.*;
-import com.datastax.driver.core.exceptions.DriverException;
-import com.datatorrent.lib.util.PojoUtils;
-import com.datatorrent.lib.util.PojoUtils.GetterBoolean;
-import com.datatorrent.lib.util.PojoUtils.GetterDouble;
-import com.datatorrent.lib.util.PojoUtils.GetterFloat;
-import com.datatorrent.lib.util.PojoUtils.GetterInt;
-import com.datatorrent.lib.util.PojoUtils.GetterLong;
-import com.datatorrent.lib.util.PojoUtils.Getter;
-
 import java.math.BigDecimal;
 import java.util.*;
 
@@ -34,6 +24,18 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.exceptions.DriverException;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.*;
+
 /**
  * <p>
  * CassandraOutputOperator class.</p>
@@ -45,46 +47,36 @@ import org.slf4j.LoggerFactory;
  * @since 2.1.0
  */
 @Evolving
-public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionableOutputOperatorPS<Object>
+public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionableOutputOperatorPS<Object> implements Operator.ActivationListener<Context.OperatorContext>
 {
-  private static final long serialVersionUID = 201506181024L;
   @NotNull
-  private ArrayList<String> columns;
-  private final transient ArrayList<DataType> columnDataTypes;
+  private List<FieldInfo> fieldInfos;
   @NotNull
-  private ArrayList<String> expressions;
-  private final transient ArrayList<Object> getters;
-
-  /*
-   * An ArrayList of Java expressions that will yield the field value from the POJO.
-   * Each expression corresponds to one column in the Cassandra table.
-   */
-  public ArrayList<String> getExpressions()
-  {
-    return expressions;
-  }
+  private String tablename;
 
-  public void setExpressions(ArrayList<String> expressions)
-  {
-    this.expressions = expressions;
-  }
+  protected final transient ArrayList<DataType> columnDataTypes;
+  protected final transient ArrayList<Object> getters;
+  protected transient Class<?> pojoClass;
 
-  /*
-   * An ArrayList of Columns in the Cassandra Table.
+  /**
+   * The input port on which tuples are received for writing.
    */
-  public ArrayList<String> getColumns()
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
   {
-    return columns;
-  }
-
-  public void setColumns(ArrayList<String> columns)
-  {
-    this.columns = columns;
-  }
+    @Override
+    public void setup(Context.PortContext context)
+    {
+      pojoClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+    }
 
-  @NotNull
-  private String tablename;
+    @Override
+    public void process(Object tuple)
+    {
+      CassandraPOJOOutputOperator.super.input.process(tuple);
+    }
 
+  };
 
   /*
    * Tablename in cassandra.
@@ -106,63 +98,63 @@ public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionabl
     getters = new ArrayList<Object>();
   }
 
-  public void processFirstTuple(Object tuple)
+  @Override
+  public void activate(Context.OperatorContext context)
   {
     com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace + "." + tablename);
 
     final ColumnDefinitions rsMetaData = rs.getColumnDefinitions();
 
     final int numberOfColumns = rsMetaData.size();
-    final Class<?> fqcn = tuple.getClass();
 
     for (int i = 0; i < numberOfColumns; i++) {
       // get the designated column's data type.
       final DataType type = rsMetaData.getType(i);
       columnDataTypes.add(type);
       final Object getter;
-      final String getterExpr = expressions.get(i);
+      final String getterExpr = fieldInfos.get(i).getPojoFieldExpression();
       switch (type.getName()) {
         case ASCII:
         case TEXT:
         case VARCHAR:
-          getter = PojoUtils.createGetter(fqcn, getterExpr, String.class);
+          getter = PojoUtils.createGetter(pojoClass, getterExpr, String.class);
           break;
         case BOOLEAN:
-          getter = PojoUtils.createGetterBoolean(fqcn, getterExpr);
+          getter = PojoUtils.createGetterBoolean(pojoClass, getterExpr);
           break;
         case INT:
-          getter = PojoUtils.createGetterInt(fqcn, getterExpr);
+          getter = PojoUtils.createGetterInt(pojoClass, getterExpr);
           break;
         case BIGINT:
         case COUNTER:
-          getter = PojoUtils.createGetterLong(fqcn, getterExpr);
+          getter = PojoUtils.createGetterLong(pojoClass, getterExpr);
           break;
         case FLOAT:
-          getter = PojoUtils.createGetterFloat(fqcn, getterExpr);
+          getter = PojoUtils.createGetterFloat(pojoClass, getterExpr);
           break;
         case DOUBLE:
-          getter = PojoUtils.createGetterDouble(fqcn, getterExpr);
+          getter = PojoUtils.createGetterDouble(pojoClass, getterExpr);
           break;
         case DECIMAL:
-          getter = PojoUtils.createGetter(fqcn, getterExpr, BigDecimal.class);
+          getter = PojoUtils.createGetter(pojoClass, getterExpr, BigDecimal.class);
           break;
         case SET:
-          getter = PojoUtils.createGetter(fqcn, getterExpr, Set.class);
+          getter = PojoUtils.createGetter(pojoClass, getterExpr, Set.class);
           break;
         case MAP:
-          getter = PojoUtils.createGetter(fqcn, getterExpr, Map.class);
+          getter = PojoUtils.createGetter(pojoClass, getterExpr, Map.class);
           break;
         case LIST:
-          getter = PojoUtils.createGetter(fqcn, getterExpr, List.class);
+          getter = PojoUtils.createGetter(pojoClass, getterExpr, List.class);
           break;
         case TIMESTAMP:
-          getter = PojoUtils.createGetter(fqcn, getterExpr, Date.class);
+          getter = PojoUtils.createGetter(pojoClass, getterExpr, Date.class);
           break;
         case UUID:
-          getter = PojoUtils.createGetter(fqcn, getterExpr, UUID.class);
+          getter = PojoUtils.createGetter(pojoClass, getterExpr, UUID.class);
           break;
         default:
-          getter = PojoUtils.createGetter(fqcn, getterExpr, Object.class);
+          getter = PojoUtils.createGetter(pojoClass, getterExpr, Object.class);
           break;
       }
       getters.add(getter);
@@ -170,17 +162,22 @@ public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionabl
   }
 
   @Override
+  public void deactivate()
+  {
+  }
+
+  @Override
   protected PreparedStatement getUpdateCommand()
   {
-    StringBuilder queryfields = new StringBuilder("");
-    StringBuilder values = new StringBuilder("");
-    for (String column: columns) {
+    StringBuilder queryfields = new StringBuilder();
+    StringBuilder values = new StringBuilder();
+    for (FieldInfo fieldInfo: fieldInfos) {
       if (queryfields.length() == 0) {
-        queryfields.append(column);
+        queryfields.append(fieldInfo.getColumnName());
         values.append("?");
       }
       else {
-        queryfields.append(",").append(column);
+        queryfields.append(",").append(fieldInfo.getColumnName());
         values.append(",").append("?");
       }
     }
@@ -197,9 +194,6 @@ public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionabl
   @SuppressWarnings("unchecked")
   protected Statement setStatementParameters(PreparedStatement updateCommand, Object tuple) throws DriverException
   {
-    if (getters.isEmpty()) {
-      processFirstTuple(tuple);
-    }
     final BoundStatement boundStmnt = new BoundStatement(updateCommand);
     final int size = columnDataTypes.size();
     for (int i = 0; i < size; i++) {
@@ -263,5 +257,26 @@ public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionabl
     return boundStmnt;
   }
 
+  /**
+   * A list of {@link FieldInfo}s where each item maps a column name to a pojo field name.
+   */
+  public List<FieldInfo> getFieldInfos()
+  {
+    return fieldInfos;
+  }
+
+  /**
+   * Sets the {@link FieldInfo}s. A {@link FieldInfo} maps a store column to a pojo field name.<br/>
+   * The value from fieldInfo.column is assigned to fieldInfo.pojoFieldExpression.
+   *
+   * @description $[].columnName name of the database column name
+   * @description $[].pojoFieldExpression pojo field name or expression
+   * @useSchema $[].pojoFieldExpression input.fields[].name
+   */
+  public void setFieldInfos(List<FieldInfo> fieldInfos)
+  {
+    this.fieldInfos = fieldInfos;
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(CassandraPOJOOutputOperator.class);
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/effc8385/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
index 6f5f75c..ab7c91e 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
@@ -17,13 +17,19 @@ package com.datatorrent.contrib.cassandra;
 
 import com.datastax.driver.core.*;
 import com.datastax.driver.core.exceptions.DriverException;
+import com.datatorrent.api.Attribute;
 import com.datatorrent.api.Attribute.AttributeMap;
+import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
+
+import com.datatorrent.lib.helper.TestPortContext;
+import com.datatorrent.lib.util.FieldInfo;
 import com.datatorrent.netlet.util.DTThrowable;
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.testbench.CollectorTestSink;
 import com.google.common.collect.Lists;
 import java.util.*;
+
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -62,6 +68,7 @@ public class CassandraOperatorTest
   @BeforeClass
   public static void setup()
   {
+    @SuppressWarnings("UnusedDeclaration") Class<?> clazz = org.codehaus.janino.CompilerFactory.class;
     try {
       cluster = Cluster.builder()
               .addContactPoint(NODE).build();
@@ -100,8 +107,6 @@ public class CassandraOperatorTest
 
   private static class TestOutputOperator extends CassandraPOJOOutputOperator
   {
-    private static final long serialVersionUID = 201506181038L;
-
     public long getNumOfEventsInStore()
     {
       String countQuery = "SELECT count(*) from " + TABLE_NAME + ";";
@@ -117,65 +122,52 @@ public class CassandraOperatorTest
     {
       String recordsQuery = "SELECT * from " + TABLE_NAME + ";";
       ResultSet resultSetRecords = session.execute(recordsQuery);
-      int count = 0;
       for (Row row: resultSetRecords) {
-        LOG.debug("Boolean value is {}", row.getBool("test"));
-        Assert.assertEquals(true, row.getBool("test"));
-        LOG.debug("lastname returned is {}", row.getString("lastname"));
-        Assert.assertEquals("abclast", row.getString("lastname"));
-        LOG.debug("Double value returned is {}", row.getDouble("doubleValue"));
-        Assert.assertEquals("Double value is", 2.0, row.getDouble("doubleValue"), 2);
-        LOG.debug("Float value returned is {}", row.getFloat("floatValue"));
-        LOG.debug("age returned is {}", row.getInt("age"));
-        LOG.debug("set returned is {} ", row.getSet("set1", Integer.class));
-        LOG.debug("list returned is {}", row.getList("list1", Integer.class));
-        LOG.debug("map returned is {}", row.getMap("map1", String.class, Integer.class));
-        LOG.debug("date returned is {}", row.getDate("last_visited"));
-        Assert.assertNotEquals(new Date(System.currentTimeMillis()), row.getDate("last_visited"));
-        if (count == 0) {
-          Assert.assertEquals(2, row.getInt("age"));
-          Assert.assertEquals(2.0, row.getFloat("floatValue"), 2);
+        int age = row.getInt("age");
+        Assert.assertEquals("check boolean", true, row.getBool("test"));
+        Assert.assertEquals("check last name", "abclast", row.getString("lastname"));
+        Assert.assertEquals("check double", 2.0, row.getDouble("doubleValue"), 2);
+        LOG.debug("age returned is {}", age);
+        Assert.assertNotEquals("check date", new Date(System.currentTimeMillis()), row.getDate("last_visited"));
+        if (age == 2) {
+          Assert.assertEquals("check float", 2.0, row.getFloat("floatValue"), 2);
           Set<Integer> set = new HashSet<Integer>();
           List<Integer> list = new ArrayList<Integer>();
           Map<String, Integer> map = new HashMap<String, Integer>();
           set.add(2);
           list.add(2);
           map.put("key2", 2);
-          Assert.assertEquals(set, row.getSet("set1", Integer.class));
-          Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
-          Assert.assertEquals(list, row.getList("list1", Integer.class));
+
+          Assert.assertEquals("check set", set, row.getSet("set1", Integer.class));
+          Assert.assertEquals("check map", map, row.getMap("map1", String.class, Integer.class));
+          Assert.assertEquals("check list", list, row.getList("list1", Integer.class));
         }
-        if (count == 1) {
-          Assert.assertEquals(0, row.getInt("age"));
-          Assert.assertEquals(0.0, row.getFloat("floatValue"), 2);
+        if (age == 0) {
+          Assert.assertEquals("check float", 0.0, row.getFloat("floatValue"), 2);
           Set<Integer> set = new HashSet<Integer>();
           List<Integer> list = new ArrayList<Integer>();
           Map<String, Integer> map = new HashMap<String, Integer>();
           set.add(0);
           list.add(0);
           map.put("key0", 0);
-          Assert.assertEquals(set, row.getSet("set1", Integer.class));
-          Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
-          Assert.assertEquals(list, row.getList("list1", Integer.class));
+          Assert.assertEquals("check set", set, row.getSet("set1", Integer.class));
+          Assert.assertEquals("check map", map, row.getMap("map1", String.class, Integer.class));
+          Assert.assertEquals("check list", list, row.getList("list1", Integer.class));
         }
-        if (count == 2) {
-          Assert.assertEquals(1, row.getInt("age"));
-          Assert.assertEquals(1.0, row.getFloat("floatValue"), 2);
+        if (age == 1) {
+          Assert.assertEquals("check float", 1.0, row.getFloat("floatValue"), 2);
           Set<Integer> set = new HashSet<Integer>();
           List<Integer> list = new ArrayList<Integer>();
           Map<String, Integer> map = new HashMap<String, Integer>();
           set.add(1);
           list.add(1);
           map.put("key1", 1);
-          Assert.assertEquals(set, row.getSet("set1", Integer.class));
-          Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
-          Assert.assertEquals(list, row.getList("list1", Integer.class));
+          Assert.assertEquals("check set", set, row.getSet("set1", Integer.class));
+          Assert.assertEquals("check map", map, row.getMap("map1", String.class, Integer.class));
+          Assert.assertEquals("check list", list, row.getList("list1", Integer.class));
         }
-        count++;
       }
-
     }
-
   }
 
   private static class TestInputOperator extends CassandraPOJOInputOperator
@@ -237,34 +229,29 @@ public class CassandraOperatorTest
     TestOutputOperator outputOperator = new TestOutputOperator();
 
     outputOperator.setTablename(TABLE_NAME);
-    ArrayList<String> columns = new ArrayList<String>();
-    columns.add("id");
-    columns.add("age");
-    columns.add("doubleValue");
-    columns.add("floatValue");
-    columns.add("last_visited");
-    columns.add("lastname");
-    columns.add("list1");
-    columns.add("map1");
-    columns.add("set1");
-    columns.add("test");
-    outputOperator.setColumns(columns);
-    ArrayList<String> expressions = new ArrayList<String>();
-    expressions.add("id");
-    expressions.add("age");
-    expressions.add("doubleValue");
-    expressions.add("floatValue");
-    expressions.add("last_visited");
-    expressions.add("lastname");
-    expressions.add("list1");
-    expressions.add("map1");
-    expressions.add("set1");
-    expressions.add("test");
-    outputOperator.setExpressions(expressions);
-    outputOperator.setStore(transactionalStore);
+    List<FieldInfo> fieldInfos = Lists.newArrayList();
+    fieldInfos.add(new FieldInfo("id", "id", null));
+    fieldInfos.add(new FieldInfo("age", "age", null));
+    fieldInfos.add(new FieldInfo("doubleValue", "doubleValue", null));
+    fieldInfos.add(new FieldInfo("floatValue", "floatValue", null));
+    fieldInfos.add(new FieldInfo("last_visited", "last_visited", null));
+    fieldInfos.add(new FieldInfo("lastname", "lastname", null));
+    fieldInfos.add(new FieldInfo("list1", "list1", null));
+    fieldInfos.add(new FieldInfo("map1", "map1", null));
+    fieldInfos.add(new FieldInfo("set1", "set1", null));
+    fieldInfos.add(new FieldInfo("test", "test", null));
 
+    outputOperator.setStore(transactionalStore);
+    outputOperator.setFieldInfos(fieldInfos);
     outputOperator.setup(context);
 
+    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPojo.class);
+    TestPortContext tpc = new TestPortContext(portAttributes);
+
+    outputOperator.input.setup(tpc);
+    outputOperator.activate(context);
+
     List<TestPojo> events = Lists.newArrayList();
     for (int i = 0; i < 3; i++) {
       Set<Integer> set = new HashSet<Integer>();
@@ -277,7 +264,7 @@ public class CassandraOperatorTest
     }
 
     outputOperator.beginWindow(0);
-    for (TestPojo event: events) {
+    for (TestPojo event : events) {
       outputOperator.input.process(event);
     }
     outputOperator.endWindow();
@@ -290,7 +277,7 @@ public class CassandraOperatorTest
    * This test can be run on cassandra server installed on node17.
    */
   @Test
-  public void TestCassandraInputOperator()
+  public void testCassandraInputOperator()
   {
     String query1 = "SELECT * FROM " + KEYSPACE + "." + "%t;";
     CassandraStore store = new CassandraStore();
@@ -303,25 +290,28 @@ public class CassandraOperatorTest
 
     TestInputOperator inputOperator = new TestInputOperator();
     inputOperator.setStore(store);
-    inputOperator.setOutputClass("com.datatorrent.contrib.cassandra.TestInputPojo");
     inputOperator.setTablename(TABLE_NAME_INPUT);
     inputOperator.setQuery(query1);
-    ArrayList<String> columns = new ArrayList<String>();
-    columns.add("id");
-    columns.add("age");
-    columns.add("lastname");
-
-    inputOperator.setColumns(columns);
-    ArrayList<String> expressions = new ArrayList<String>();
-    expressions.add("id");
-    expressions.add("age");
-    expressions.add("lastname");
-    inputOperator.setExpressions(expressions);
-    inputOperator.insertEventsInTable(30);
     inputOperator.setPrimaryKeyColumn("id");
-    CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
+
+    List<FieldInfo> fieldInfos = Lists.newArrayList();
+    fieldInfos.add(new FieldInfo("id", "id", null));
+    fieldInfos.add(new FieldInfo("age", "age", null));
+    fieldInfos.add(new FieldInfo("lastname", "lastname", null));
+    inputOperator.setFieldInfos(fieldInfos);
+
+    inputOperator.insertEventsInTable(30);
+    CollectorTestSink<Object> sink = new CollectorTestSink<>();
     inputOperator.outputPort.setSink(sink);
+
+    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestInputPojo.class);
+    TestPortContext tpc = new TestPortContext(portAttributes);
+
     inputOperator.setup(context);
+    inputOperator.outputPort.setup(tpc);
+    inputOperator.activate(context);
+
     inputOperator.beginWindow(0);
     inputOperator.emitTuples();
     inputOperator.endWindow();
@@ -336,8 +326,14 @@ public class CassandraOperatorTest
     }
 
     sink.clear();
+    inputOperator.columnDataTypes.clear();
+
     String query2 = "SELECT * FROM " + KEYSPACE + "." + "%t where token(%p) > %v;";
     inputOperator.setQuery(query2);
+    inputOperator.setup(context);
+    inputOperator.outputPort.setup(tpc);
+    inputOperator.activate(context);
+
     inputOperator.setStartRow(10);
     inputOperator.beginWindow(1);
     inputOperator.emitTuples();
@@ -345,15 +341,20 @@ public class CassandraOperatorTest
     Assert.assertEquals("rows from db", 14, sink.collectedTuples.size());
 
     sink.clear();
+    inputOperator.columnDataTypes.clear();
+
     String query3 = "SELECT * FROM " + KEYSPACE + "." + "%t where token(%p) > %v LIMIT %l;";
     inputOperator.setQuery(query3);
+    inputOperator.setup(context);
+    inputOperator.outputPort.setup(tpc);
+    inputOperator.activate(context);
+
     inputOperator.setStartRow(1);
     inputOperator.setLimit(10);
     inputOperator.beginWindow(2);
     inputOperator.emitTuples();
     inputOperator.endWindow();
     Assert.assertEquals("rows from db", 10, sink.collectedTuples.size());
-
   }
 
   public static class TestPojo



[2/2] incubator-apex-malhar git commit: Merge branch 'cassandraSchemaIntegration' into devel-3

Posted by is...@apache.org.
Merge branch 'cassandraSchemaIntegration' into devel-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/9194a72c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/9194a72c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/9194a72c

Branch: refs/heads/devel-3
Commit: 9194a72c352ff3a3fe5e75301c7aeb482e2181ea
Parents: f7f9085 effc838
Author: ishark <is...@datatorrent.com>
Authored: Fri Sep 18 13:26:52 2015 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Fri Sep 18 13:26:52 2015 -0700

----------------------------------------------------------------------
 .../AbstractCassandraInputOperator.java         |   9 +-
 .../cassandra/CassandraPOJOInputOperator.java   | 276 +++++++++----------
 .../cassandra/CassandraPOJOOutputOperator.java  | 145 +++++-----
 .../cassandra/CassandraOperatorTest.java        | 159 +++++------
 4 files changed, 293 insertions(+), 296 deletions(-)
----------------------------------------------------------------------