You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by is...@apache.org on 2015/09/18 22:37:19 UTC
[1/2] incubator-apex-malhar git commit: Cassandra integration with
Schema Support
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/devel-3 f7f9085e8 -> 9194a72c3
Cassandra integration with Schema Support
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/effc8385
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/effc8385
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/effc8385
Branch: refs/heads/devel-3
Commit: effc8385af173448f0dfc9e8e2243b5fd728acef
Parents: f7f9085
Author: Chandni Singh <ch...@datatorrent.com>
Authored: Sun Aug 2 21:27:59 2015 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Fri Sep 18 13:02:44 2015 -0700
----------------------------------------------------------------------
.../AbstractCassandraInputOperator.java | 9 +-
.../cassandra/CassandraPOJOInputOperator.java | 276 +++++++++----------
.../cassandra/CassandraPOJOOutputOperator.java | 145 +++++-----
.../cassandra/CassandraOperatorTest.java | 159 +++++------
4 files changed, 293 insertions(+), 296 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/effc8385/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
index 0c14a0e..7560768 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
@@ -99,14 +99,14 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp
public void emitTuples()
{
String query = queryToRetrieveData();
- logger.debug(String.format("select statement: %s", query));
+ logger.debug("select statement: {}", query);
try {
ResultSet result = store.getSession().execute(query);
if (!result.isExhausted()) {
for (Row row : result) {
T tuple = getTuple(row);
- outputPort.emit(tuple);
+ emit(tuple);
}
} else {
// No rows available wait for some time before retrying so as to not continuously slam the database
@@ -118,4 +118,9 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp
DTThrowable.rethrow(ex);
}
}
+
+ protected void emit(T tuple)
+ {
+ outputPort.emit(tuple);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/effc8385/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
index 9d8e356..13f4dd0 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
@@ -21,17 +21,23 @@ import java.util.*;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.Row;
-import org.apache.hadoop.classification.InterfaceStability.Evolving;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.FieldInfo;
import com.datatorrent.lib.util.PojoUtils;
import com.datatorrent.lib.util.PojoUtils.*;
-import com.datatorrent.api.Context.OperatorContext;
/**
* <p>
@@ -49,29 +55,38 @@ import com.datatorrent.api.Context.OperatorContext;
* @since 3.0.0
*/
@Evolving
-public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<Object>
+public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<Object> implements Operator.ActivationListener<OperatorContext>
{
@NotNull
- private List<String> columns;
- private final transient List<DataType> columnDataTypes;
+ private List<FieldInfo> fieldInfos;
private Number startRow = 0;
@NotNull
- private List<String> expressions;
- @NotNull
private String tablename;
- private final transient List<Object> setters;
@NotNull
private String query;
-
- private transient Class<?> objectClass = null;
@NotNull
- protected String primaryKeyColumn;
- protected transient DataType primaryKeyColumnType;
- private transient Row lastRowInBatch;
+ private String primaryKeyColumn;
@Min(1)
private int limit = 10;
+ private transient DataType primaryKeyColumnType;
+ private transient Row lastRowInBatch;
+
+ protected final transient List<Object> setters;
+ protected final transient List<DataType> columnDataTypes;
+ protected transient Class<?> pojoClass;
+
+ @OutputPortFieldAnnotation(schemaRequired = true)
+ public final transient DefaultOutputPort<Object> outputPort = new DefaultOutputPort<Object>()
+ {
+ @Override
+ public void setup(Context.PortContext context)
+ {
+ pojoClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
+ };
+
/*
* Number of records to be fetched in one time from cassandra table.
*/
@@ -114,25 +129,6 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
}
/*
- * POJO class which is generated as output from this operator.
- * Example:
- * public class TestPOJO{ int intfield; public int getInt(){} public void setInt(){} }
- * outputClass = TestPOJO
- * POJOs will be generated on fly in later implementation.
- */
- private String outputClass;
-
- public String getOutputClass()
- {
- return outputClass;
- }
-
- public void setOutputClass(String outputClass)
- {
- this.outputClass = outputClass;
- }
-
- /*
* Parameterized query with parameters such as %t for table name , %p for primary key, %s for start value and %l for limit.
* Example of retrieveQuery:
* select * from %t where token(%p) > %s limit %l;
@@ -147,31 +143,25 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
this.query = query.replace("%t", tablename);
}
- /*
- * An ArrayList of Java expressions that will yield the cassandra column value to be set in output object.
- * Each expression corresponds to one column in the Cassandra table.
+ /**
+ * A list of {@link FieldInfo}s where each item maps a column name to a pojo field name.
*/
- public List<String> getExpressions()
+ public List<FieldInfo> getFieldInfos()
{
- return expressions;
+ return fieldInfos;
}
- public void setExpressions(List<String> expressions)
- {
- this.expressions = expressions;
- }
-
- /*
- * List of column names specified by User in the same order as expressions for the particular fields.
+ /**
+ * Sets the {@link FieldInfo}s. A {@link FieldInfo} maps a store column to a pojo field name.<br/>
+ * The value from fieldInfo.column is assigned to fieldInfo.pojoFieldExpression.
+ *
+ * @description $[].columnName name of the database column name
+ * @description $[].pojoFieldExpression pojo field name or expression
+ * @useSchema $[].pojoFieldExpression outputPort.fields[].name
*/
- public List<String> getColumns()
+ public void setFieldInfos(List<FieldInfo> fieldInfos)
{
- return columns;
- }
-
- public void setColumns(List<String> columns)
- {
- this.columns = columns;
+ this.fieldInfos = fieldInfos;
}
/*
@@ -196,87 +186,72 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
}
@Override
- public void setup(OperatorContext context)
+ public void activate(OperatorContext context)
{
- super.setup(context);
- if (setters.isEmpty()) {
- try {
- // This code will be replaced after integration of creating POJOs on the fly utility.
- objectClass = Class.forName(outputClass);
- }
- catch (ClassNotFoundException ex) {
- throw new RuntimeException(ex);
- }
+ com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace + "." + tablename + " LIMIT " + 1);
+ ColumnDefinitions rsMetaData = rs.getColumnDefinitions();
- com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace + "." + tablename + " LIMIT " + 1);
- ColumnDefinitions rsMetaData = rs.getColumnDefinitions();
-
- primaryKeyColumnType = rsMetaData.getType(primaryKeyColumn);
- if(query.contains("%p"))
- {
- query = query.replace("%p", primaryKeyColumn);
- }
- if(query.contains("%l"))
- {
- query = query.replace("%l", limit+"");
- }
-
- logger.debug("query is {}",query);
-
- //In case columns is a subset
- int columnSize = columns.size();
- for (int i = 0; i < columnSize; i++) {
- // Get the designated column's data type.
- DataType type = rsMetaData.getType(columns.get(i));
- columnDataTypes.add(type);
- Object setter;
- final String setterExpr = expressions.get(i);
- switch (type.getName()) {
- case ASCII:
- case TEXT:
- case VARCHAR:
- setter = PojoUtils.createSetter(objectClass, setterExpr, String.class);
- break;
- case BOOLEAN:
- setter = PojoUtils.createSetterBoolean(objectClass, setterExpr);
- break;
- case INT:
- setter = PojoUtils.createSetterInt(objectClass, setterExpr);
- break;
- case BIGINT:
- case COUNTER:
- setter = PojoUtils.createSetterLong(objectClass, setterExpr);
- break;
- case FLOAT:
- setter = PojoUtils.createSetterFloat(objectClass, setterExpr);
- break;
- case DOUBLE:
- setter = PojoUtils.createSetterDouble(objectClass, setterExpr);
- break;
- case DECIMAL:
- setter = PojoUtils.createSetter(objectClass, setterExpr, BigDecimal.class);
- break;
- case SET:
- setter = PojoUtils.createSetter(objectClass, setterExpr, Set.class);
- break;
- case MAP:
- setter = PojoUtils.createSetter(objectClass, setterExpr, Map.class);
- break;
- case LIST:
- setter = PojoUtils.createSetter(objectClass, setterExpr, List.class);
- break;
- case TIMESTAMP:
- setter = PojoUtils.createSetter(objectClass, setterExpr, Date.class);
- break;
- case UUID:
- setter = PojoUtils.createSetter(objectClass, setterExpr, UUID.class);
- break;
- default:
- setter = PojoUtils.createSetter(objectClass, setterExpr, Object.class);
- break;
- }
- setters.add(setter);
+ primaryKeyColumnType = rsMetaData.getType(primaryKeyColumn);
+ if (query.contains("%p")) {
+ query = query.replace("%p", primaryKeyColumn);
+ }
+ if (query.contains("%l")) {
+ query = query.replace("%l", limit + "");
+ }
+
+ LOG.debug("query is {}", query);
+
+ for (FieldInfo fieldInfo : fieldInfos) {
+ // Get the designated column's data type.
+ DataType type = rsMetaData.getType(fieldInfo.getColumnName());
+ columnDataTypes.add(type);
+ Object setter;
+ final String setterExpr = fieldInfo.getPojoFieldExpression();
+ switch (type.getName()) {
+ case ASCII:
+ case TEXT:
+ case VARCHAR:
+ setter = PojoUtils.createSetter(pojoClass, setterExpr, String.class);
+ break;
+ case BOOLEAN:
+ setter = PojoUtils.createSetterBoolean(pojoClass, setterExpr);
+ break;
+ case INT:
+ setter = PojoUtils.createSetterInt(pojoClass, setterExpr);
+ break;
+ case BIGINT:
+ case COUNTER:
+ setter = PojoUtils.createSetterLong(pojoClass, setterExpr);
+ break;
+ case FLOAT:
+ setter = PojoUtils.createSetterFloat(pojoClass, setterExpr);
+ break;
+ case DOUBLE:
+ setter = PojoUtils.createSetterDouble(pojoClass, setterExpr);
+ break;
+ case DECIMAL:
+ setter = PojoUtils.createSetter(pojoClass, setterExpr, BigDecimal.class);
+ break;
+ case SET:
+ setter = PojoUtils.createSetter(pojoClass, setterExpr, Set.class);
+ break;
+ case MAP:
+ setter = PojoUtils.createSetter(pojoClass, setterExpr, Map.class);
+ break;
+ case LIST:
+ setter = PojoUtils.createSetter(pojoClass, setterExpr, List.class);
+ break;
+ case TIMESTAMP:
+ setter = PojoUtils.createSetter(pojoClass, setterExpr, Date.class);
+ break;
+ case UUID:
+ setter = PojoUtils.createSetter(pojoClass, setterExpr, UUID.class);
+ break;
+ default:
+ setter = PojoUtils.createSetter(pojoClass, setterExpr, Object.class);
+ break;
}
+ setters.add(setter);
}
}
@@ -285,23 +260,19 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
public Object getTuple(Row row)
{
lastRowInBatch = row;
- Object obj = null;
- final int size = columnDataTypes.size();
+ Object obj;
try {
// This code will be replaced after integration of creating POJOs on the fly utility.
- obj = objectClass.newInstance();
- }
- catch (InstantiationException ex) {
- throw new RuntimeException(ex);
+ obj = pojoClass.newInstance();
}
- catch (IllegalAccessException ex) {
+ catch (InstantiationException | IllegalAccessException ex) {
throw new RuntimeException(ex);
}
- for (int i = 0; i < size; i++) {
+ for (int i = 0; i < columnDataTypes.size(); i++) {
DataType type = columnDataTypes.get(i);
- String columnName = columns.get(i);
+ String columnName = fieldInfos.get(i).getColumnName();
switch (type.getName()) {
case UUID:
final UUID id = row.getUUID(columnName);
@@ -370,16 +341,10 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
@Override
public String queryToRetrieveData()
{
- String parameterizedQuery;
- if(query.contains("%v"))
- {
- parameterizedQuery = query.replace("%v", startRow+"");
+ if (query.contains("%v")) {
+ return query.replace("%v", startRow + "");
}
- else
- {
- parameterizedQuery = query;
- }
- return parameterizedQuery;
+ return query;
}
@@ -411,5 +376,16 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
}
- private static final Logger logger = LoggerFactory.getLogger(CassandraPOJOInputOperator.class);
+ @Override
+ protected void emit(Object tuple)
+ {
+ outputPort.emit(tuple);
+ }
+
+ @Override
+ public void deactivate()
+ {
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(CassandraPOJOInputOperator.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/effc8385/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
index 61ef26c..bc4f97e 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
@@ -15,16 +15,6 @@
*/
package com.datatorrent.contrib.cassandra;
-import com.datastax.driver.core.*;
-import com.datastax.driver.core.exceptions.DriverException;
-import com.datatorrent.lib.util.PojoUtils;
-import com.datatorrent.lib.util.PojoUtils.GetterBoolean;
-import com.datatorrent.lib.util.PojoUtils.GetterDouble;
-import com.datatorrent.lib.util.PojoUtils.GetterFloat;
-import com.datatorrent.lib.util.PojoUtils.GetterInt;
-import com.datatorrent.lib.util.PojoUtils.GetterLong;
-import com.datatorrent.lib.util.PojoUtils.Getter;
-
import java.math.BigDecimal;
import java.util.*;
@@ -34,6 +24,18 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.exceptions.DriverException;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.*;
+
/**
* <p>
* CassandraOutputOperator class.</p>
@@ -45,46 +47,36 @@ import org.slf4j.LoggerFactory;
* @since 2.1.0
*/
@Evolving
-public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionableOutputOperatorPS<Object>
+public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionableOutputOperatorPS<Object> implements Operator.ActivationListener<Context.OperatorContext>
{
- private static final long serialVersionUID = 201506181024L;
@NotNull
- private ArrayList<String> columns;
- private final transient ArrayList<DataType> columnDataTypes;
+ private List<FieldInfo> fieldInfos;
@NotNull
- private ArrayList<String> expressions;
- private final transient ArrayList<Object> getters;
-
- /*
- * An ArrayList of Java expressions that will yield the field value from the POJO.
- * Each expression corresponds to one column in the Cassandra table.
- */
- public ArrayList<String> getExpressions()
- {
- return expressions;
- }
+ private String tablename;
- public void setExpressions(ArrayList<String> expressions)
- {
- this.expressions = expressions;
- }
+ protected final transient ArrayList<DataType> columnDataTypes;
+ protected final transient ArrayList<Object> getters;
+ protected transient Class<?> pojoClass;
- /*
- * An ArrayList of Columns in the Cassandra Table.
+ /**
+ * The input port on which tuples are received for writing.
*/
- public ArrayList<String> getColumns()
+ @InputPortFieldAnnotation(optional = true)
+ public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
{
- return columns;
- }
-
- public void setColumns(ArrayList<String> columns)
- {
- this.columns = columns;
- }
+ @Override
+ public void setup(Context.PortContext context)
+ {
+ pojoClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
- @NotNull
- private String tablename;
+ @Override
+ public void process(Object tuple)
+ {
+ CassandraPOJOOutputOperator.super.input.process(tuple);
+ }
+ };
/*
* Tablename in cassandra.
@@ -106,63 +98,63 @@ public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionabl
getters = new ArrayList<Object>();
}
- public void processFirstTuple(Object tuple)
+ @Override
+ public void activate(Context.OperatorContext context)
{
com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace + "." + tablename);
final ColumnDefinitions rsMetaData = rs.getColumnDefinitions();
final int numberOfColumns = rsMetaData.size();
- final Class<?> fqcn = tuple.getClass();
for (int i = 0; i < numberOfColumns; i++) {
// get the designated column's data type.
final DataType type = rsMetaData.getType(i);
columnDataTypes.add(type);
final Object getter;
- final String getterExpr = expressions.get(i);
+ final String getterExpr = fieldInfos.get(i).getPojoFieldExpression();
switch (type.getName()) {
case ASCII:
case TEXT:
case VARCHAR:
- getter = PojoUtils.createGetter(fqcn, getterExpr, String.class);
+ getter = PojoUtils.createGetter(pojoClass, getterExpr, String.class);
break;
case BOOLEAN:
- getter = PojoUtils.createGetterBoolean(fqcn, getterExpr);
+ getter = PojoUtils.createGetterBoolean(pojoClass, getterExpr);
break;
case INT:
- getter = PojoUtils.createGetterInt(fqcn, getterExpr);
+ getter = PojoUtils.createGetterInt(pojoClass, getterExpr);
break;
case BIGINT:
case COUNTER:
- getter = PojoUtils.createGetterLong(fqcn, getterExpr);
+ getter = PojoUtils.createGetterLong(pojoClass, getterExpr);
break;
case FLOAT:
- getter = PojoUtils.createGetterFloat(fqcn, getterExpr);
+ getter = PojoUtils.createGetterFloat(pojoClass, getterExpr);
break;
case DOUBLE:
- getter = PojoUtils.createGetterDouble(fqcn, getterExpr);
+ getter = PojoUtils.createGetterDouble(pojoClass, getterExpr);
break;
case DECIMAL:
- getter = PojoUtils.createGetter(fqcn, getterExpr, BigDecimal.class);
+ getter = PojoUtils.createGetter(pojoClass, getterExpr, BigDecimal.class);
break;
case SET:
- getter = PojoUtils.createGetter(fqcn, getterExpr, Set.class);
+ getter = PojoUtils.createGetter(pojoClass, getterExpr, Set.class);
break;
case MAP:
- getter = PojoUtils.createGetter(fqcn, getterExpr, Map.class);
+ getter = PojoUtils.createGetter(pojoClass, getterExpr, Map.class);
break;
case LIST:
- getter = PojoUtils.createGetter(fqcn, getterExpr, List.class);
+ getter = PojoUtils.createGetter(pojoClass, getterExpr, List.class);
break;
case TIMESTAMP:
- getter = PojoUtils.createGetter(fqcn, getterExpr, Date.class);
+ getter = PojoUtils.createGetter(pojoClass, getterExpr, Date.class);
break;
case UUID:
- getter = PojoUtils.createGetter(fqcn, getterExpr, UUID.class);
+ getter = PojoUtils.createGetter(pojoClass, getterExpr, UUID.class);
break;
default:
- getter = PojoUtils.createGetter(fqcn, getterExpr, Object.class);
+ getter = PojoUtils.createGetter(pojoClass, getterExpr, Object.class);
break;
}
getters.add(getter);
@@ -170,17 +162,22 @@ public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionabl
}
@Override
+ public void deactivate()
+ {
+ }
+
+ @Override
protected PreparedStatement getUpdateCommand()
{
- StringBuilder queryfields = new StringBuilder("");
- StringBuilder values = new StringBuilder("");
- for (String column: columns) {
+ StringBuilder queryfields = new StringBuilder();
+ StringBuilder values = new StringBuilder();
+ for (FieldInfo fieldInfo: fieldInfos) {
if (queryfields.length() == 0) {
- queryfields.append(column);
+ queryfields.append(fieldInfo.getColumnName());
values.append("?");
}
else {
- queryfields.append(",").append(column);
+ queryfields.append(",").append(fieldInfo.getColumnName());
values.append(",").append("?");
}
}
@@ -197,9 +194,6 @@ public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionabl
@SuppressWarnings("unchecked")
protected Statement setStatementParameters(PreparedStatement updateCommand, Object tuple) throws DriverException
{
- if (getters.isEmpty()) {
- processFirstTuple(tuple);
- }
final BoundStatement boundStmnt = new BoundStatement(updateCommand);
final int size = columnDataTypes.size();
for (int i = 0; i < size; i++) {
@@ -263,5 +257,26 @@ public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionabl
return boundStmnt;
}
+ /**
+ * A list of {@link FieldInfo}s where each item maps a column name to a pojo field name.
+ */
+ public List<FieldInfo> getFieldInfos()
+ {
+ return fieldInfos;
+ }
+
+ /**
+ * Sets the {@link FieldInfo}s. A {@link FieldInfo} maps a store column to a pojo field name.<br/>
+ * The value from fieldInfo.column is assigned to fieldInfo.pojoFieldExpression.
+ *
+ * @description $[].columnName name of the database column name
+ * @description $[].pojoFieldExpression pojo field name or expression
+ * @useSchema $[].pojoFieldExpression input.fields[].name
+ */
+ public void setFieldInfos(List<FieldInfo> fieldInfos)
+ {
+ this.fieldInfos = fieldInfos;
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(CassandraPOJOOutputOperator.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/effc8385/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
index 6f5f75c..ab7c91e 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
@@ -17,13 +17,19 @@ package com.datatorrent.contrib.cassandra;
import com.datastax.driver.core.*;
import com.datastax.driver.core.exceptions.DriverException;
+import com.datatorrent.api.Attribute;
import com.datatorrent.api.Attribute.AttributeMap;
+import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
+
+import com.datatorrent.lib.helper.TestPortContext;
+import com.datatorrent.lib.util.FieldInfo;
import com.datatorrent.netlet.util.DTThrowable;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.google.common.collect.Lists;
import java.util.*;
+
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -62,6 +68,7 @@ public class CassandraOperatorTest
@BeforeClass
public static void setup()
{
+ @SuppressWarnings("UnusedDeclaration") Class<?> clazz = org.codehaus.janino.CompilerFactory.class;
try {
cluster = Cluster.builder()
.addContactPoint(NODE).build();
@@ -100,8 +107,6 @@ public class CassandraOperatorTest
private static class TestOutputOperator extends CassandraPOJOOutputOperator
{
- private static final long serialVersionUID = 201506181038L;
-
public long getNumOfEventsInStore()
{
String countQuery = "SELECT count(*) from " + TABLE_NAME + ";";
@@ -117,65 +122,52 @@ public class CassandraOperatorTest
{
String recordsQuery = "SELECT * from " + TABLE_NAME + ";";
ResultSet resultSetRecords = session.execute(recordsQuery);
- int count = 0;
for (Row row: resultSetRecords) {
- LOG.debug("Boolean value is {}", row.getBool("test"));
- Assert.assertEquals(true, row.getBool("test"));
- LOG.debug("lastname returned is {}", row.getString("lastname"));
- Assert.assertEquals("abclast", row.getString("lastname"));
- LOG.debug("Double value returned is {}", row.getDouble("doubleValue"));
- Assert.assertEquals("Double value is", 2.0, row.getDouble("doubleValue"), 2);
- LOG.debug("Float value returned is {}", row.getFloat("floatValue"));
- LOG.debug("age returned is {}", row.getInt("age"));
- LOG.debug("set returned is {} ", row.getSet("set1", Integer.class));
- LOG.debug("list returned is {}", row.getList("list1", Integer.class));
- LOG.debug("map returned is {}", row.getMap("map1", String.class, Integer.class));
- LOG.debug("date returned is {}", row.getDate("last_visited"));
- Assert.assertNotEquals(new Date(System.currentTimeMillis()), row.getDate("last_visited"));
- if (count == 0) {
- Assert.assertEquals(2, row.getInt("age"));
- Assert.assertEquals(2.0, row.getFloat("floatValue"), 2);
+ int age = row.getInt("age");
+ Assert.assertEquals("check boolean", true, row.getBool("test"));
+ Assert.assertEquals("check last name", "abclast", row.getString("lastname"));
+ Assert.assertEquals("check double", 2.0, row.getDouble("doubleValue"), 2);
+ LOG.debug("age returned is {}", age);
+ Assert.assertNotEquals("check date", new Date(System.currentTimeMillis()), row.getDate("last_visited"));
+ if (age == 2) {
+ Assert.assertEquals("check float", 2.0, row.getFloat("floatValue"), 2);
Set<Integer> set = new HashSet<Integer>();
List<Integer> list = new ArrayList<Integer>();
Map<String, Integer> map = new HashMap<String, Integer>();
set.add(2);
list.add(2);
map.put("key2", 2);
- Assert.assertEquals(set, row.getSet("set1", Integer.class));
- Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
- Assert.assertEquals(list, row.getList("list1", Integer.class));
+
+ Assert.assertEquals("check set", set, row.getSet("set1", Integer.class));
+ Assert.assertEquals("check map", map, row.getMap("map1", String.class, Integer.class));
+ Assert.assertEquals("check list", list, row.getList("list1", Integer.class));
}
- if (count == 1) {
- Assert.assertEquals(0, row.getInt("age"));
- Assert.assertEquals(0.0, row.getFloat("floatValue"), 2);
+ if (age == 0) {
+ Assert.assertEquals("check float", 0.0, row.getFloat("floatValue"), 2);
Set<Integer> set = new HashSet<Integer>();
List<Integer> list = new ArrayList<Integer>();
Map<String, Integer> map = new HashMap<String, Integer>();
set.add(0);
list.add(0);
map.put("key0", 0);
- Assert.assertEquals(set, row.getSet("set1", Integer.class));
- Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
- Assert.assertEquals(list, row.getList("list1", Integer.class));
+ Assert.assertEquals("check set", set, row.getSet("set1", Integer.class));
+ Assert.assertEquals("check map", map, row.getMap("map1", String.class, Integer.class));
+ Assert.assertEquals("check list", list, row.getList("list1", Integer.class));
}
- if (count == 2) {
- Assert.assertEquals(1, row.getInt("age"));
- Assert.assertEquals(1.0, row.getFloat("floatValue"), 2);
+ if (age == 1) {
+ Assert.assertEquals("check float", 1.0, row.getFloat("floatValue"), 2);
Set<Integer> set = new HashSet<Integer>();
List<Integer> list = new ArrayList<Integer>();
Map<String, Integer> map = new HashMap<String, Integer>();
set.add(1);
list.add(1);
map.put("key1", 1);
- Assert.assertEquals(set, row.getSet("set1", Integer.class));
- Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
- Assert.assertEquals(list, row.getList("list1", Integer.class));
+ Assert.assertEquals("check set", set, row.getSet("set1", Integer.class));
+ Assert.assertEquals("check map", map, row.getMap("map1", String.class, Integer.class));
+ Assert.assertEquals("check list", list, row.getList("list1", Integer.class));
}
- count++;
}
-
}
-
}
private static class TestInputOperator extends CassandraPOJOInputOperator
@@ -237,34 +229,29 @@ public class CassandraOperatorTest
TestOutputOperator outputOperator = new TestOutputOperator();
outputOperator.setTablename(TABLE_NAME);
- ArrayList<String> columns = new ArrayList<String>();
- columns.add("id");
- columns.add("age");
- columns.add("doubleValue");
- columns.add("floatValue");
- columns.add("last_visited");
- columns.add("lastname");
- columns.add("list1");
- columns.add("map1");
- columns.add("set1");
- columns.add("test");
- outputOperator.setColumns(columns);
- ArrayList<String> expressions = new ArrayList<String>();
- expressions.add("id");
- expressions.add("age");
- expressions.add("doubleValue");
- expressions.add("floatValue");
- expressions.add("last_visited");
- expressions.add("lastname");
- expressions.add("list1");
- expressions.add("map1");
- expressions.add("set1");
- expressions.add("test");
- outputOperator.setExpressions(expressions);
- outputOperator.setStore(transactionalStore);
+ List<FieldInfo> fieldInfos = Lists.newArrayList();
+ fieldInfos.add(new FieldInfo("id", "id", null));
+ fieldInfos.add(new FieldInfo("age", "age", null));
+ fieldInfos.add(new FieldInfo("doubleValue", "doubleValue", null));
+ fieldInfos.add(new FieldInfo("floatValue", "floatValue", null));
+ fieldInfos.add(new FieldInfo("last_visited", "last_visited", null));
+ fieldInfos.add(new FieldInfo("lastname", "lastname", null));
+ fieldInfos.add(new FieldInfo("list1", "list1", null));
+ fieldInfos.add(new FieldInfo("map1", "map1", null));
+ fieldInfos.add(new FieldInfo("set1", "set1", null));
+ fieldInfos.add(new FieldInfo("test", "test", null));
+ outputOperator.setStore(transactionalStore);
+ outputOperator.setFieldInfos(fieldInfos);
outputOperator.setup(context);
+ Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPojo.class);
+ TestPortContext tpc = new TestPortContext(portAttributes);
+
+ outputOperator.input.setup(tpc);
+ outputOperator.activate(context);
+
List<TestPojo> events = Lists.newArrayList();
for (int i = 0; i < 3; i++) {
Set<Integer> set = new HashSet<Integer>();
@@ -277,7 +264,7 @@ public class CassandraOperatorTest
}
outputOperator.beginWindow(0);
- for (TestPojo event: events) {
+ for (TestPojo event : events) {
outputOperator.input.process(event);
}
outputOperator.endWindow();
@@ -290,7 +277,7 @@ public class CassandraOperatorTest
* This test can be run on cassandra server installed on node17.
*/
@Test
- public void TestCassandraInputOperator()
+ public void testCassandraInputOperator()
{
String query1 = "SELECT * FROM " + KEYSPACE + "." + "%t;";
CassandraStore store = new CassandraStore();
@@ -303,25 +290,28 @@ public class CassandraOperatorTest
TestInputOperator inputOperator = new TestInputOperator();
inputOperator.setStore(store);
- inputOperator.setOutputClass("com.datatorrent.contrib.cassandra.TestInputPojo");
inputOperator.setTablename(TABLE_NAME_INPUT);
inputOperator.setQuery(query1);
- ArrayList<String> columns = new ArrayList<String>();
- columns.add("id");
- columns.add("age");
- columns.add("lastname");
-
- inputOperator.setColumns(columns);
- ArrayList<String> expressions = new ArrayList<String>();
- expressions.add("id");
- expressions.add("age");
- expressions.add("lastname");
- inputOperator.setExpressions(expressions);
- inputOperator.insertEventsInTable(30);
inputOperator.setPrimaryKeyColumn("id");
- CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
+
+ List<FieldInfo> fieldInfos = Lists.newArrayList();
+ fieldInfos.add(new FieldInfo("id", "id", null));
+ fieldInfos.add(new FieldInfo("age", "age", null));
+ fieldInfos.add(new FieldInfo("lastname", "lastname", null));
+ inputOperator.setFieldInfos(fieldInfos);
+
+ inputOperator.insertEventsInTable(30);
+ CollectorTestSink<Object> sink = new CollectorTestSink<>();
inputOperator.outputPort.setSink(sink);
+
+ Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ portAttributes.put(Context.PortContext.TUPLE_CLASS, TestInputPojo.class);
+ TestPortContext tpc = new TestPortContext(portAttributes);
+
inputOperator.setup(context);
+ inputOperator.outputPort.setup(tpc);
+ inputOperator.activate(context);
+
inputOperator.beginWindow(0);
inputOperator.emitTuples();
inputOperator.endWindow();
@@ -336,8 +326,14 @@ public class CassandraOperatorTest
}
sink.clear();
+ inputOperator.columnDataTypes.clear();
+
String query2 = "SELECT * FROM " + KEYSPACE + "." + "%t where token(%p) > %v;";
inputOperator.setQuery(query2);
+ inputOperator.setup(context);
+ inputOperator.outputPort.setup(tpc);
+ inputOperator.activate(context);
+
inputOperator.setStartRow(10);
inputOperator.beginWindow(1);
inputOperator.emitTuples();
@@ -345,15 +341,20 @@ public class CassandraOperatorTest
Assert.assertEquals("rows from db", 14, sink.collectedTuples.size());
sink.clear();
+ inputOperator.columnDataTypes.clear();
+
String query3 = "SELECT * FROM " + KEYSPACE + "." + "%t where token(%p) > %v LIMIT %l;";
inputOperator.setQuery(query3);
+ inputOperator.setup(context);
+ inputOperator.outputPort.setup(tpc);
+ inputOperator.activate(context);
+
inputOperator.setStartRow(1);
inputOperator.setLimit(10);
inputOperator.beginWindow(2);
inputOperator.emitTuples();
inputOperator.endWindow();
Assert.assertEquals("rows from db", 10, sink.collectedTuples.size());
-
}
public static class TestPojo
[2/2] incubator-apex-malhar git commit: Merge branch
'cassandraSchemaIntegration' into devel-3
Posted by is...@apache.org.
Merge branch 'cassandraSchemaIntegration' into devel-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/9194a72c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/9194a72c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/9194a72c
Branch: refs/heads/devel-3
Commit: 9194a72c352ff3a3fe5e75301c7aeb482e2181ea
Parents: f7f9085 effc838
Author: ishark <is...@datatorrent.com>
Authored: Fri Sep 18 13:26:52 2015 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Fri Sep 18 13:26:52 2015 -0700
----------------------------------------------------------------------
.../AbstractCassandraInputOperator.java | 9 +-
.../cassandra/CassandraPOJOInputOperator.java | 276 +++++++++----------
.../cassandra/CassandraPOJOOutputOperator.java | 145 +++++-----
.../cassandra/CassandraOperatorTest.java | 159 +++++------
4 files changed, 293 insertions(+), 296 deletions(-)
----------------------------------------------------------------------