You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by bh...@apache.org on 2016/05/25 20:25:29 UTC

[2/4] incubator-apex-malhar git commit: APEXMALHAR-1988: Updating cassandra batch fetch logic to use Cassandra Paging feature

APEXMALHAR-1988: Updating cassandra batch fetch logic to use Cassandra Paging feature


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/d094a04f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/d094a04f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/d094a04f

Branch: refs/heads/master
Commit: d094a04fcbb3cea459610e55dae30ef2c86179a7
Parents: 4b126aa
Author: Priyanka Gugale <pr...@datatorrent.com>
Authored: Wed Feb 10 14:45:03 2016 +0530
Committer: Priyanka Gugale <pr...@datatorrent.com>
Committed: Tue May 17 14:31:14 2016 -0700

----------------------------------------------------------------------
 .../AbstractCassandraInputOperator.java         | 36 ++++++++-
 .../cassandra/CassandraPOJOInputOperator.java   | 77 +++++---------------
 2 files changed, 52 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d094a04f/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
index 366d97a..0f2f0d0 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
@@ -19,8 +19,10 @@
 package com.datatorrent.contrib.cassandra;
 
 
+import com.datastax.driver.core.PagingState;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
+import com.datastax.driver.core.SimpleStatement;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,7 +47,8 @@ import com.datatorrent.netlet.util.DTThrowable;
 public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInputOperator<T, CassandraStore> {
 
   private static final Logger logger = LoggerFactory.getLogger(AbstractCassandraInputOperator.class);
-
+  private PagingState nextPageState;
+  private int fetchSize;
   int waitForDataTimeout = 100;
   @AutoMetric
   protected long tuplesRead;
@@ -112,8 +115,15 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp
     String query = queryToRetrieveData();
     logger.debug("select statement: {}", query);
 
+    SimpleStatement stmt = new SimpleStatement(query);
+    stmt.setFetchSize(fetchSize);
     try {
-      ResultSet result = store.getSession().execute(query);
+      if (nextPageState != null) {
+        stmt.setPagingState(nextPageState);
+      }
+      ResultSet result = store.getSession().execute(stmt);
+      nextPageState = result.getExecutionInfo().getPagingState();
+
       if (!result.isExhausted()) {
         for (Row row : result) {
           T tuple = getTuple(row);
@@ -124,8 +134,7 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp
         // No rows available wait for some time before retrying so as to not continuously slam the database
         Thread.sleep(waitForDataTimeout);
       }
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       store.disconnect();
       DTThrowable.rethrow(ex);
     }
@@ -135,4 +144,23 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp
   {
     outputPort.emit(tuple);
   }
+
+  /**
+   * Get page fetch Size
+   * @return
+   */
+  public int getFetchSize()
+  {
+    return fetchSize;
+  }
+
+  /**
+   * Set page fetch size
+   * @param fetchSize
+   */
+  public void setFetchSize(int fetchSize)
+  {
+    this.fetchSize = fetchSize;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d094a04f/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
index 3d87711..b7301cb 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
@@ -61,6 +61,7 @@ import com.datatorrent.lib.util.PojoUtils.*;
 @Evolving
 public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<Object> implements Operator.ActivationListener<OperatorContext>
 {
+  private String TOKEN_QUERY;
   @NotNull
   private List<FieldInfo> fieldInfos;
   private Number startRow;
@@ -71,15 +72,9 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
   private String query;
   @NotNull
   private String primaryKeyColumn;
-
   @Min(1)
   private int limit = 10;
 
-  private String TOKEN_QUERY;
-  private transient DataType primaryKeyColumnType;
-  private transient Row lastRowInBatch;
-  private transient BoundStatement fetchKeyStatement;
-
   protected final transient List<Object> setters;
   protected final transient List<DataType> columnDataTypes;
   protected transient Class<?> pojoClass;
@@ -94,14 +89,19 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
     }
   };
 
-  /*
-   * Number of records to be fetched in one time from cassandra table.
+  /**
+   * Gets number of records to be fetched at one time from cassandra table.
+   * @return limit
    */
   public int getLimit()
   {
     return limit;
   }
 
+  /**
+   * Sets number of records to be fetched at one time from cassandra table.
+   * @param limit
+   */
   public void setLimit(int limit)
   {
     this.limit = limit;
@@ -138,7 +138,7 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
   /*
    * Parameterized query with parameters such as %t for table name , %p for primary key, %s for start value and %l for limit.
    * Example of retrieveQuery:
-   * select * from %t where token(%p) > %s limit %l;
+   * select * from %t where token(%p) > %s LIMIT %l;
    */
   public String getQuery()
   {
@@ -196,22 +196,22 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
   public void setup(OperatorContext context)
   {
     super.setup(context);
-    Long keyToken;
     TOKEN_QUERY = "select token(" + primaryKeyColumn + ") from " + store.keyspace + "." + tablename + " where " + primaryKeyColumn + " =  ?";
-    PreparedStatement statement = store.getSession().prepare(TOKEN_QUERY);
-    fetchKeyStatement = new BoundStatement(statement);
-    if (startRow != null && (keyToken = fetchKeyTokenFromDB(startRow)) != null) {
-      startRowToken = keyToken;
-    }
   }
 
   @Override
   public void activate(OperatorContext context)
   {
+    Long keyToken;
+    if (startRow != null) {
+      if ((keyToken = fetchKeyTokenFromDB(startRow)) != null) {
+        startRowToken = keyToken;
+      }
+    }
+
     com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace + "." + tablename + " LIMIT " + 1);
     ColumnDefinitions rsMetaData = rs.getColumnDefinitions();
 
-    primaryKeyColumnType = rsMetaData.getType(primaryKeyColumn);
     if (query.contains("%t")) {
       query = query.replace("%t", tablename);
     }
@@ -282,7 +282,6 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
   @SuppressWarnings("unchecked")
   public Object getTuple(Row row)
   {
-    lastRowInBatch = row;
     Object obj;
 
     try {
@@ -370,48 +369,12 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
     return query;
   }
 
-
-  /*
-   * Overriding emitTupes to save primarykey column value from last row in batch.
-   */
-  @Override
-  public void emitTuples()
-  {
-    super.emitTuples();
-    if (lastRowInBatch != null) {
-      startRowToken = getPrimaryKeyToken(primaryKeyColumnType.getName());
-    }
-  }
-
-  private Long getPrimaryKeyToken(DataType.Name primaryKeyDataType)
-  {
-    Object keyValue;
-    switch (primaryKeyDataType) {
-      case UUID:
-        keyValue = lastRowInBatch.getUUID(primaryKeyColumn);
-        break;
-      case INT:
-        keyValue = lastRowInBatch.getInt(primaryKeyColumn);
-        break;
-      case COUNTER:
-        keyValue = lastRowInBatch.getLong(primaryKeyColumn);
-        break;
-      case FLOAT:
-        keyValue = lastRowInBatch.getFloat(primaryKeyColumn);
-        break;
-      case DOUBLE:
-        keyValue = lastRowInBatch.getDouble(primaryKeyColumn);
-        break;
-      default:
-        throw new RuntimeException("unsupported data type " + primaryKeyColumnType.getName());
-    }
-    return fetchKeyTokenFromDB(keyValue);
-  }
-
   private Long fetchKeyTokenFromDB(Object keyValue)
   {
-    fetchKeyStatement.bind(keyValue);
-    ResultSet rs = store.getSession().execute(fetchKeyStatement);
+    PreparedStatement statement = store.getSession().prepare(TOKEN_QUERY);
+    BoundStatement boundStatement = new BoundStatement(statement);
+    boundStatement.bind(keyValue);
+    ResultSet rs = store.getSession().execute(boundStatement);
     Long keyTokenValue = rs.one().getLong(0);
     return keyTokenValue;
   }