You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by bh...@apache.org on 2016/05/25 20:25:28 UTC
[1/4] incubator-apex-malhar git commit: Renaming a variable
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/master 9c11400a2 -> dc36b9896
Renaming a variable
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/802cfce2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/802cfce2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/802cfce2
Branch: refs/heads/master
Commit: 802cfce223a6296b2a3a9d61a89d4461b7b96fed
Parents: d094a04
Author: Priyanka Gugale <pr...@datatorrent.com>
Authored: Thu May 12 16:59:29 2016 -0700
Committer: Priyanka Gugale <pr...@datatorrent.com>
Committed: Tue May 17 14:31:14 2016 -0700
----------------------------------------------------------------------
.../contrib/cassandra/CassandraPOJOInputOperator.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/802cfce2/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
index b7301cb..9c56178 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
@@ -61,7 +61,7 @@ import com.datatorrent.lib.util.PojoUtils.*;
@Evolving
public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<Object> implements Operator.ActivationListener<OperatorContext>
{
- private String TOKEN_QUERY;
+ private String tokenQuery;
@NotNull
private List<FieldInfo> fieldInfos;
private Number startRow;
@@ -196,7 +196,7 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
public void setup(OperatorContext context)
{
super.setup(context);
- TOKEN_QUERY = "select token(" + primaryKeyColumn + ") from " + store.keyspace + "." + tablename + " where " + primaryKeyColumn + " = ?";
+ tokenQuery = "select token(" + primaryKeyColumn + ") from " + store.keyspace + "." + tablename + " where " + primaryKeyColumn + " = ?";
}
@Override
@@ -371,7 +371,7 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
private Long fetchKeyTokenFromDB(Object keyValue)
{
- PreparedStatement statement = store.getSession().prepare(TOKEN_QUERY);
+ PreparedStatement statement = store.getSession().prepare(tokenQuery);
BoundStatement boundStatement = new BoundStatement(statement);
boundStatement.bind(keyValue);
ResultSet rs = store.getSession().execute(boundStatement);
[2/4] incubator-apex-malhar git commit: APEXMALHAR-1988: Updating
cassandra batch fetch logic to use Cassandra Paging feature
Posted by bh...@apache.org.
APEXMALHAR-1988: Updating cassandra batch fetch logic to use Cassandra Paging feature
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/d094a04f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/d094a04f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/d094a04f
Branch: refs/heads/master
Commit: d094a04fcbb3cea459610e55dae30ef2c86179a7
Parents: 4b126aa
Author: Priyanka Gugale <pr...@datatorrent.com>
Authored: Wed Feb 10 14:45:03 2016 +0530
Committer: Priyanka Gugale <pr...@datatorrent.com>
Committed: Tue May 17 14:31:14 2016 -0700
----------------------------------------------------------------------
.../AbstractCassandraInputOperator.java | 36 ++++++++-
.../cassandra/CassandraPOJOInputOperator.java | 77 +++++---------------
2 files changed, 52 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d094a04f/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
index 366d97a..0f2f0d0 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
@@ -19,8 +19,10 @@
package com.datatorrent.contrib.cassandra;
+import com.datastax.driver.core.PagingState;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
+import com.datastax.driver.core.SimpleStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +47,8 @@ import com.datatorrent.netlet.util.DTThrowable;
public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInputOperator<T, CassandraStore> {
private static final Logger logger = LoggerFactory.getLogger(AbstractCassandraInputOperator.class);
-
+ private PagingState nextPageState;
+ private int fetchSize;
int waitForDataTimeout = 100;
@AutoMetric
protected long tuplesRead;
@@ -112,8 +115,15 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp
String query = queryToRetrieveData();
logger.debug("select statement: {}", query);
+ SimpleStatement stmt = new SimpleStatement(query);
+ stmt.setFetchSize(fetchSize);
try {
- ResultSet result = store.getSession().execute(query);
+ if (nextPageState != null) {
+ stmt.setPagingState(nextPageState);
+ }
+ ResultSet result = store.getSession().execute(stmt);
+ nextPageState = result.getExecutionInfo().getPagingState();
+
if (!result.isExhausted()) {
for (Row row : result) {
T tuple = getTuple(row);
@@ -124,8 +134,7 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp
// No rows available wait for some time before retrying so as to not continuously slam the database
Thread.sleep(waitForDataTimeout);
}
- }
- catch (Exception ex) {
+ } catch (Exception ex) {
store.disconnect();
DTThrowable.rethrow(ex);
}
@@ -135,4 +144,23 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp
{
outputPort.emit(tuple);
}
+
+ /**
+ * Get page fetch Size
+ * @return
+ */
+ public int getFetchSize()
+ {
+ return fetchSize;
+ }
+
+ /**
+ * Set page fetch size
+ * @param fetchSize
+ */
+ public void setFetchSize(int fetchSize)
+ {
+ this.fetchSize = fetchSize;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d094a04f/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
index 3d87711..b7301cb 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
@@ -61,6 +61,7 @@ import com.datatorrent.lib.util.PojoUtils.*;
@Evolving
public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<Object> implements Operator.ActivationListener<OperatorContext>
{
+ private String TOKEN_QUERY;
@NotNull
private List<FieldInfo> fieldInfos;
private Number startRow;
@@ -71,15 +72,9 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
private String query;
@NotNull
private String primaryKeyColumn;
-
@Min(1)
private int limit = 10;
- private String TOKEN_QUERY;
- private transient DataType primaryKeyColumnType;
- private transient Row lastRowInBatch;
- private transient BoundStatement fetchKeyStatement;
-
protected final transient List<Object> setters;
protected final transient List<DataType> columnDataTypes;
protected transient Class<?> pojoClass;
@@ -94,14 +89,19 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
}
};
- /*
- * Number of records to be fetched in one time from cassandra table.
+ /**
+ * Gets number of records to be fetched at one time from cassandra table.
+ * @return limit
*/
public int getLimit()
{
return limit;
}
+ /**
+ * Sets number of records to be fetched at one time from cassandra table.
+ * @param limit
+ */
public void setLimit(int limit)
{
this.limit = limit;
@@ -138,7 +138,7 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
/*
* Parameterized query with parameters such as %t for table name , %p for primary key, %s for start value and %l for limit.
* Example of retrieveQuery:
- * select * from %t where token(%p) > %s limit %l;
+ * select * from %t where token(%p) > %s LIMIT %l;
*/
public String getQuery()
{
@@ -196,22 +196,22 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
public void setup(OperatorContext context)
{
super.setup(context);
- Long keyToken;
TOKEN_QUERY = "select token(" + primaryKeyColumn + ") from " + store.keyspace + "." + tablename + " where " + primaryKeyColumn + " = ?";
- PreparedStatement statement = store.getSession().prepare(TOKEN_QUERY);
- fetchKeyStatement = new BoundStatement(statement);
- if (startRow != null && (keyToken = fetchKeyTokenFromDB(startRow)) != null) {
- startRowToken = keyToken;
- }
}
@Override
public void activate(OperatorContext context)
{
+ Long keyToken;
+ if (startRow != null) {
+ if ((keyToken = fetchKeyTokenFromDB(startRow)) != null) {
+ startRowToken = keyToken;
+ }
+ }
+
com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace + "." + tablename + " LIMIT " + 1);
ColumnDefinitions rsMetaData = rs.getColumnDefinitions();
- primaryKeyColumnType = rsMetaData.getType(primaryKeyColumn);
if (query.contains("%t")) {
query = query.replace("%t", tablename);
}
@@ -282,7 +282,6 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
@SuppressWarnings("unchecked")
public Object getTuple(Row row)
{
- lastRowInBatch = row;
Object obj;
try {
@@ -370,48 +369,12 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
return query;
}
-
- /*
- * Overriding emitTupes to save primarykey column value from last row in batch.
- */
- @Override
- public void emitTuples()
- {
- super.emitTuples();
- if (lastRowInBatch != null) {
- startRowToken = getPrimaryKeyToken(primaryKeyColumnType.getName());
- }
- }
-
- private Long getPrimaryKeyToken(DataType.Name primaryKeyDataType)
- {
- Object keyValue;
- switch (primaryKeyDataType) {
- case UUID:
- keyValue = lastRowInBatch.getUUID(primaryKeyColumn);
- break;
- case INT:
- keyValue = lastRowInBatch.getInt(primaryKeyColumn);
- break;
- case COUNTER:
- keyValue = lastRowInBatch.getLong(primaryKeyColumn);
- break;
- case FLOAT:
- keyValue = lastRowInBatch.getFloat(primaryKeyColumn);
- break;
- case DOUBLE:
- keyValue = lastRowInBatch.getDouble(primaryKeyColumn);
- break;
- default:
- throw new RuntimeException("unsupported data type " + primaryKeyColumnType.getName());
- }
- return fetchKeyTokenFromDB(keyValue);
- }
-
private Long fetchKeyTokenFromDB(Object keyValue)
{
- fetchKeyStatement.bind(keyValue);
- ResultSet rs = store.getSession().execute(fetchKeyStatement);
+ PreparedStatement statement = store.getSession().prepare(TOKEN_QUERY);
+ BoundStatement boundStatement = new BoundStatement(statement);
+ boundStatement.bind(keyValue);
+ ResultSet rs = store.getSession().execute(boundStatement);
Long keyTokenValue = rs.one().getLong(0);
return keyTokenValue;
}
[4/4] incubator-apex-malhar git commit: Merge branch
'APEXMALHAR-1988-cassandra-input' of
https://github.com/DT-Priyanka/incubator-apex-malhar
Posted by bh...@apache.org.
Merge branch 'APEXMALHAR-1988-cassandra-input' of https://github.com/DT-Priyanka/incubator-apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/dc36b989
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/dc36b989
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/dc36b989
Branch: refs/heads/master
Commit: dc36b98962a1b0a2138c271809699f880dfe72c6
Parents: 9c11400 b047b82
Author: bhupesh <bh...@gmail.com>
Authored: Wed May 25 11:30:12 2016 -0700
Committer: bhupesh <bh...@gmail.com>
Committed: Wed May 25 11:30:12 2016 -0700
----------------------------------------------------------------------
.../AbstractCassandraInputOperator.java | 41 +++++++---
.../cassandra/CassandraPOJOInputOperator.java | 79 ++++++--------------
2 files changed, 53 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
[3/4] incubator-apex-malhar git commit: Removing autometrics
Posted by bh...@apache.org.
Removing autometrics
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/b047b825
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/b047b825
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/b047b825
Branch: refs/heads/master
Commit: b047b82540b0526247e8a8bd925b66d1f38bffbe
Parents: 802cfce
Author: Priyanka Gugale <pr...@datatorrent.com>
Authored: Tue May 17 14:33:41 2016 -0700
Committer: Priyanka Gugale <pr...@datatorrent.com>
Committed: Tue May 17 14:33:41 2016 -0700
----------------------------------------------------------------------
.../contrib/cassandra/AbstractCassandraInputOperator.java | 5 -----
1 file changed, 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b047b825/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
index 0f2f0d0..7bd47fc 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.lib.db.AbstractStoreInputOperator;
-import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.netlet.util.DTThrowable;
@@ -50,14 +49,11 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp
private PagingState nextPageState;
private int fetchSize;
int waitForDataTimeout = 100;
- @AutoMetric
- protected long tuplesRead;
@Override
public void beginWindow(long l)
{
super.beginWindow(l);
- tuplesRead = 0;
}
/**
@@ -128,7 +124,6 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp
for (Row row : result) {
T tuple = getTuple(row);
emit(tuple);
- tuplesRead++;
}
} else {
// No rows available wait for some time before retrying so as to not continuously slam the database