You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2016/05/25 22:36:45 UTC
incubator-apex-malhar git commit: APEXMALHAR-1988: Updating cassandra
batch fetch logic to use Cassandra Paging feature
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/master 9c11400a2 -> a91287ce7
APEXMALHAR-1988: Updating cassandra batch fetch logic to use Cassandra Paging feature
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/a91287ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/a91287ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/a91287ce
Branch: refs/heads/master
Commit: a91287ce7652f3f0000498ebfa33c16315f44e45
Parents: 9c11400
Author: Priyanka Gugale <pr...@datatorrent.com>
Authored: Wed Feb 10 14:45:03 2016 +0530
Committer: Priyanka Gugale <pr...@datatorrent.com>
Committed: Wed May 25 15:35:29 2016 -0700
----------------------------------------------------------------------
.../AbstractCassandraInputOperator.java | 41 +++++++---
.../cassandra/CassandraPOJOInputOperator.java | 79 ++++++--------------
2 files changed, 53 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a91287ce/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
index 366d97a..7bd47fc 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
@@ -19,14 +19,15 @@
package com.datatorrent.contrib.cassandra;
+import com.datastax.driver.core.PagingState;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
+import com.datastax.driver.core.SimpleStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.lib.db.AbstractStoreInputOperator;
-import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.netlet.util.DTThrowable;
@@ -45,16 +46,14 @@ import com.datatorrent.netlet.util.DTThrowable;
public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInputOperator<T, CassandraStore> {
private static final Logger logger = LoggerFactory.getLogger(AbstractCassandraInputOperator.class);
-
+ private PagingState nextPageState;
+ private int fetchSize;
int waitForDataTimeout = 100;
- @AutoMetric
- protected long tuplesRead;
@Override
public void beginWindow(long l)
{
super.beginWindow(l);
- tuplesRead = 0;
}
/**
@@ -112,20 +111,25 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp
String query = queryToRetrieveData();
logger.debug("select statement: {}", query);
+ SimpleStatement stmt = new SimpleStatement(query);
+ stmt.setFetchSize(fetchSize);
try {
- ResultSet result = store.getSession().execute(query);
+ if (nextPageState != null) {
+ stmt.setPagingState(nextPageState);
+ }
+ ResultSet result = store.getSession().execute(stmt);
+ nextPageState = result.getExecutionInfo().getPagingState();
+
if (!result.isExhausted()) {
for (Row row : result) {
T tuple = getTuple(row);
emit(tuple);
- tuplesRead++;
}
} else {
// No rows available wait for some time before retrying so as to not continuously slam the database
Thread.sleep(waitForDataTimeout);
}
- }
- catch (Exception ex) {
+ } catch (Exception ex) {
store.disconnect();
DTThrowable.rethrow(ex);
}
@@ -135,4 +139,23 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp
{
outputPort.emit(tuple);
}
+
+ /**
+ * Get page fetch Size
+ * @return
+ */
+ public int getFetchSize()
+ {
+ return fetchSize;
+ }
+
+ /**
+ * Set page fetch size
+ * @param fetchSize
+ */
+ public void setFetchSize(int fetchSize)
+ {
+ this.fetchSize = fetchSize;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a91287ce/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
index 3d87711..9c56178 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
@@ -61,6 +61,7 @@ import com.datatorrent.lib.util.PojoUtils.*;
@Evolving
public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<Object> implements Operator.ActivationListener<OperatorContext>
{
+ private String tokenQuery;
@NotNull
private List<FieldInfo> fieldInfos;
private Number startRow;
@@ -71,15 +72,9 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
private String query;
@NotNull
private String primaryKeyColumn;
-
@Min(1)
private int limit = 10;
- private String TOKEN_QUERY;
- private transient DataType primaryKeyColumnType;
- private transient Row lastRowInBatch;
- private transient BoundStatement fetchKeyStatement;
-
protected final transient List<Object> setters;
protected final transient List<DataType> columnDataTypes;
protected transient Class<?> pojoClass;
@@ -94,14 +89,19 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
}
};
- /*
- * Number of records to be fetched in one time from cassandra table.
+ /**
+ * Gets number of records to be fetched at one time from cassandra table.
+ * @return limit
*/
public int getLimit()
{
return limit;
}
+ /**
+ * Sets number of records to be fetched at one time from cassandra table.
+ * @param limit
+ */
public void setLimit(int limit)
{
this.limit = limit;
@@ -138,7 +138,7 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
/*
* Parameterized query with parameters such as %t for table name , %p for primary key, %s for start value and %l for limit.
* Example of retrieveQuery:
- * select * from %t where token(%p) > %s limit %l;
+ * select * from %t where token(%p) > %s LIMIT %l;
*/
public String getQuery()
{
@@ -196,22 +196,22 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
public void setup(OperatorContext context)
{
super.setup(context);
- Long keyToken;
- TOKEN_QUERY = "select token(" + primaryKeyColumn + ") from " + store.keyspace + "." + tablename + " where " + primaryKeyColumn + " = ?";
- PreparedStatement statement = store.getSession().prepare(TOKEN_QUERY);
- fetchKeyStatement = new BoundStatement(statement);
- if (startRow != null && (keyToken = fetchKeyTokenFromDB(startRow)) != null) {
- startRowToken = keyToken;
- }
+ tokenQuery = "select token(" + primaryKeyColumn + ") from " + store.keyspace + "." + tablename + " where " + primaryKeyColumn + " = ?";
}
@Override
public void activate(OperatorContext context)
{
+ Long keyToken;
+ if (startRow != null) {
+ if ((keyToken = fetchKeyTokenFromDB(startRow)) != null) {
+ startRowToken = keyToken;
+ }
+ }
+
com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace + "." + tablename + " LIMIT " + 1);
ColumnDefinitions rsMetaData = rs.getColumnDefinitions();
- primaryKeyColumnType = rsMetaData.getType(primaryKeyColumn);
if (query.contains("%t")) {
query = query.replace("%t", tablename);
}
@@ -282,7 +282,6 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
@SuppressWarnings("unchecked")
public Object getTuple(Row row)
{
- lastRowInBatch = row;
Object obj;
try {
@@ -370,48 +369,12 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
return query;
}
-
- /*
- * Overriding emitTupes to save primarykey column value from last row in batch.
- */
- @Override
- public void emitTuples()
- {
- super.emitTuples();
- if (lastRowInBatch != null) {
- startRowToken = getPrimaryKeyToken(primaryKeyColumnType.getName());
- }
- }
-
- private Long getPrimaryKeyToken(DataType.Name primaryKeyDataType)
- {
- Object keyValue;
- switch (primaryKeyDataType) {
- case UUID:
- keyValue = lastRowInBatch.getUUID(primaryKeyColumn);
- break;
- case INT:
- keyValue = lastRowInBatch.getInt(primaryKeyColumn);
- break;
- case COUNTER:
- keyValue = lastRowInBatch.getLong(primaryKeyColumn);
- break;
- case FLOAT:
- keyValue = lastRowInBatch.getFloat(primaryKeyColumn);
- break;
- case DOUBLE:
- keyValue = lastRowInBatch.getDouble(primaryKeyColumn);
- break;
- default:
- throw new RuntimeException("unsupported data type " + primaryKeyColumnType.getName());
- }
- return fetchKeyTokenFromDB(keyValue);
- }
-
private Long fetchKeyTokenFromDB(Object keyValue)
{
- fetchKeyStatement.bind(keyValue);
- ResultSet rs = store.getSession().execute(fetchKeyStatement);
+ PreparedStatement statement = store.getSession().prepare(tokenQuery);
+ BoundStatement boundStatement = new BoundStatement(statement);
+ boundStatement.bind(keyValue);
+ ResultSet rs = store.getSession().execute(boundStatement);
Long keyTokenValue = rs.one().getLong(0);
return keyTokenValue;
}