You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by bh...@apache.org on 2016/05/25 20:25:29 UTC
[2/4] incubator-apex-malhar git commit: APEXMALHAR-1988: Updating
cassandra batch fetch logic to use Cassandra Paging feature
APEXMALHAR-1988: Updating cassandra batch fetch logic to use Cassandra Paging feature
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/d094a04f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/d094a04f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/d094a04f
Branch: refs/heads/master
Commit: d094a04fcbb3cea459610e55dae30ef2c86179a7
Parents: 4b126aa
Author: Priyanka Gugale <pr...@datatorrent.com>
Authored: Wed Feb 10 14:45:03 2016 +0530
Committer: Priyanka Gugale <pr...@datatorrent.com>
Committed: Tue May 17 14:31:14 2016 -0700
----------------------------------------------------------------------
.../AbstractCassandraInputOperator.java | 36 ++++++++-
.../cassandra/CassandraPOJOInputOperator.java | 77 +++++---------------
2 files changed, 52 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d094a04f/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
index 366d97a..0f2f0d0 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
@@ -19,8 +19,10 @@
package com.datatorrent.contrib.cassandra;
+import com.datastax.driver.core.PagingState;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
+import com.datastax.driver.core.SimpleStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +47,8 @@ import com.datatorrent.netlet.util.DTThrowable;
public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInputOperator<T, CassandraStore> {
private static final Logger logger = LoggerFactory.getLogger(AbstractCassandraInputOperator.class);
-
+ private PagingState nextPageState;
+ private int fetchSize;
int waitForDataTimeout = 100;
@AutoMetric
protected long tuplesRead;
@@ -112,8 +115,15 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp
String query = queryToRetrieveData();
logger.debug("select statement: {}", query);
+ SimpleStatement stmt = new SimpleStatement(query);
+ stmt.setFetchSize(fetchSize);
try {
- ResultSet result = store.getSession().execute(query);
+ if (nextPageState != null) {
+ stmt.setPagingState(nextPageState);
+ }
+ ResultSet result = store.getSession().execute(stmt);
+ nextPageState = result.getExecutionInfo().getPagingState();
+
if (!result.isExhausted()) {
for (Row row : result) {
T tuple = getTuple(row);
@@ -124,8 +134,7 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp
// No rows available wait for some time before retrying so as to not continuously slam the database
Thread.sleep(waitForDataTimeout);
}
- }
- catch (Exception ex) {
+ } catch (Exception ex) {
store.disconnect();
DTThrowable.rethrow(ex);
}
@@ -135,4 +144,23 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp
{
outputPort.emit(tuple);
}
+
+ /**
+ * Get page fetch Size
+ * @return
+ */
+ public int getFetchSize()
+ {
+ return fetchSize;
+ }
+
+ /**
+ * Set page fetch size
+ * @param fetchSize
+ */
+ public void setFetchSize(int fetchSize)
+ {
+ this.fetchSize = fetchSize;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d094a04f/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
index 3d87711..b7301cb 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
@@ -61,6 +61,7 @@ import com.datatorrent.lib.util.PojoUtils.*;
@Evolving
public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<Object> implements Operator.ActivationListener<OperatorContext>
{
+ private String TOKEN_QUERY;
@NotNull
private List<FieldInfo> fieldInfos;
private Number startRow;
@@ -71,15 +72,9 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
private String query;
@NotNull
private String primaryKeyColumn;
-
@Min(1)
private int limit = 10;
- private String TOKEN_QUERY;
- private transient DataType primaryKeyColumnType;
- private transient Row lastRowInBatch;
- private transient BoundStatement fetchKeyStatement;
-
protected final transient List<Object> setters;
protected final transient List<DataType> columnDataTypes;
protected transient Class<?> pojoClass;
@@ -94,14 +89,19 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
}
};
- /*
- * Number of records to be fetched in one time from cassandra table.
+ /**
+ * Gets number of records to be fetched at one time from cassandra table.
+ * @return limit
*/
public int getLimit()
{
return limit;
}
+ /**
+ * Sets number of records to be fetched at one time from cassandra table.
+ * @param limit
+ */
public void setLimit(int limit)
{
this.limit = limit;
@@ -138,7 +138,7 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
/*
* Parameterized query with parameters such as %t for table name , %p for primary key, %s for start value and %l for limit.
* Example of retrieveQuery:
- * select * from %t where token(%p) > %s limit %l;
+ * select * from %t where token(%p) > %s LIMIT %l;
*/
public String getQuery()
{
@@ -196,22 +196,22 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
public void setup(OperatorContext context)
{
super.setup(context);
- Long keyToken;
TOKEN_QUERY = "select token(" + primaryKeyColumn + ") from " + store.keyspace + "." + tablename + " where " + primaryKeyColumn + " = ?";
- PreparedStatement statement = store.getSession().prepare(TOKEN_QUERY);
- fetchKeyStatement = new BoundStatement(statement);
- if (startRow != null && (keyToken = fetchKeyTokenFromDB(startRow)) != null) {
- startRowToken = keyToken;
- }
}
@Override
public void activate(OperatorContext context)
{
+ Long keyToken;
+ if (startRow != null) {
+ if ((keyToken = fetchKeyTokenFromDB(startRow)) != null) {
+ startRowToken = keyToken;
+ }
+ }
+
com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace + "." + tablename + " LIMIT " + 1);
ColumnDefinitions rsMetaData = rs.getColumnDefinitions();
- primaryKeyColumnType = rsMetaData.getType(primaryKeyColumn);
if (query.contains("%t")) {
query = query.replace("%t", tablename);
}
@@ -282,7 +282,6 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
@SuppressWarnings("unchecked")
public Object getTuple(Row row)
{
- lastRowInBatch = row;
Object obj;
try {
@@ -370,48 +369,12 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
return query;
}
-
- /*
- * Overriding emitTupes to save primarykey column value from last row in batch.
- */
- @Override
- public void emitTuples()
- {
- super.emitTuples();
- if (lastRowInBatch != null) {
- startRowToken = getPrimaryKeyToken(primaryKeyColumnType.getName());
- }
- }
-
- private Long getPrimaryKeyToken(DataType.Name primaryKeyDataType)
- {
- Object keyValue;
- switch (primaryKeyDataType) {
- case UUID:
- keyValue = lastRowInBatch.getUUID(primaryKeyColumn);
- break;
- case INT:
- keyValue = lastRowInBatch.getInt(primaryKeyColumn);
- break;
- case COUNTER:
- keyValue = lastRowInBatch.getLong(primaryKeyColumn);
- break;
- case FLOAT:
- keyValue = lastRowInBatch.getFloat(primaryKeyColumn);
- break;
- case DOUBLE:
- keyValue = lastRowInBatch.getDouble(primaryKeyColumn);
- break;
- default:
- throw new RuntimeException("unsupported data type " + primaryKeyColumnType.getName());
- }
- return fetchKeyTokenFromDB(keyValue);
- }
-
private Long fetchKeyTokenFromDB(Object keyValue)
{
- fetchKeyStatement.bind(keyValue);
- ResultSet rs = store.getSession().execute(fetchKeyStatement);
+ PreparedStatement statement = store.getSession().prepare(TOKEN_QUERY);
+ BoundStatement boundStatement = new BoundStatement(statement);
+ boundStatement.bind(keyValue);
+ ResultSet rs = store.getSession().execute(boundStatement);
Long keyTokenValue = rs.one().getLong(0);
return keyTokenValue;
}