You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2016/05/25 22:36:45 UTC

incubator-apex-malhar git commit: APEXMALHAR-1988: Updating cassandra batch fetch logic to use Cassandra Paging feature

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master 9c11400a2 -> a91287ce7


APEXMALHAR-1988: Updating cassandra batch fetch logic to use Cassandra Paging feature


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/a91287ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/a91287ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/a91287ce

Branch: refs/heads/master
Commit: a91287ce7652f3f0000498ebfa33c16315f44e45
Parents: 9c11400
Author: Priyanka Gugale <pr...@datatorrent.com>
Authored: Wed Feb 10 14:45:03 2016 +0530
Committer: Priyanka Gugale <pr...@datatorrent.com>
Committed: Wed May 25 15:35:29 2016 -0700

----------------------------------------------------------------------
 .../AbstractCassandraInputOperator.java         | 41 +++++++---
 .../cassandra/CassandraPOJOInputOperator.java   | 79 ++++++--------------
 2 files changed, 53 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a91287ce/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
index 366d97a..7bd47fc 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
@@ -19,14 +19,15 @@
 package com.datatorrent.contrib.cassandra;
 
 
+import com.datastax.driver.core.PagingState;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
+import com.datastax.driver.core.SimpleStatement;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.datatorrent.lib.db.AbstractStoreInputOperator;
-import com.datatorrent.api.AutoMetric;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.netlet.util.DTThrowable;
 
@@ -45,16 +46,14 @@ import com.datatorrent.netlet.util.DTThrowable;
 public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInputOperator<T, CassandraStore> {
 
   private static final Logger logger = LoggerFactory.getLogger(AbstractCassandraInputOperator.class);
-
+  private PagingState nextPageState;
+  private int fetchSize;
   int waitForDataTimeout = 100;
-  @AutoMetric
-  protected long tuplesRead;
 
   @Override
   public void beginWindow(long l)
   {
     super.beginWindow(l);
-    tuplesRead = 0;
   }
 
   /**
@@ -112,20 +111,25 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp
     String query = queryToRetrieveData();
     logger.debug("select statement: {}", query);
 
+    SimpleStatement stmt = new SimpleStatement(query);
+    stmt.setFetchSize(fetchSize);
     try {
-      ResultSet result = store.getSession().execute(query);
+      if (nextPageState != null) {
+        stmt.setPagingState(nextPageState);
+      }
+      ResultSet result = store.getSession().execute(stmt);
+      nextPageState = result.getExecutionInfo().getPagingState();
+
       if (!result.isExhausted()) {
         for (Row row : result) {
           T tuple = getTuple(row);
           emit(tuple);
-          tuplesRead++;
         }
       } else {
         // No rows available wait for some time before retrying so as to not continuously slam the database
         Thread.sleep(waitForDataTimeout);
       }
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       store.disconnect();
       DTThrowable.rethrow(ex);
     }
@@ -135,4 +139,23 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp
   {
     outputPort.emit(tuple);
   }
+
+  /**
+   * Get page fetch Size
+   * @return
+   */
+  public int getFetchSize()
+  {
+    return fetchSize;
+  }
+
+  /**
+   * Set page fetch size
+   * @param fetchSize
+   */
+  public void setFetchSize(int fetchSize)
+  {
+    this.fetchSize = fetchSize;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a91287ce/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
index 3d87711..9c56178 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
@@ -61,6 +61,7 @@ import com.datatorrent.lib.util.PojoUtils.*;
 @Evolving
 public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<Object> implements Operator.ActivationListener<OperatorContext>
 {
+  private String tokenQuery;
   @NotNull
   private List<FieldInfo> fieldInfos;
   private Number startRow;
@@ -71,15 +72,9 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
   private String query;
   @NotNull
   private String primaryKeyColumn;
-
   @Min(1)
   private int limit = 10;
 
-  private String TOKEN_QUERY;
-  private transient DataType primaryKeyColumnType;
-  private transient Row lastRowInBatch;
-  private transient BoundStatement fetchKeyStatement;
-
   protected final transient List<Object> setters;
   protected final transient List<DataType> columnDataTypes;
   protected transient Class<?> pojoClass;
@@ -94,14 +89,19 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
     }
   };
 
-  /*
-   * Number of records to be fetched in one time from cassandra table.
+  /**
+   * Gets number of records to be fetched at one time from cassandra table.
+   * @return limit
    */
   public int getLimit()
   {
     return limit;
   }
 
+  /**
+   * Sets number of records to be fetched at one time from cassandra table.
+   * @param limit
+   */
   public void setLimit(int limit)
   {
     this.limit = limit;
@@ -138,7 +138,7 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
   /*
    * Parameterized query with parameters such as %t for table name , %p for primary key, %s for start value and %l for limit.
    * Example of retrieveQuery:
-   * select * from %t where token(%p) > %s limit %l;
+   * select * from %t where token(%p) > %s LIMIT %l;
    */
   public String getQuery()
   {
@@ -196,22 +196,22 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
   public void setup(OperatorContext context)
   {
     super.setup(context);
-    Long keyToken;
-    TOKEN_QUERY = "select token(" + primaryKeyColumn + ") from " + store.keyspace + "." + tablename + " where " + primaryKeyColumn + " =  ?";
-    PreparedStatement statement = store.getSession().prepare(TOKEN_QUERY);
-    fetchKeyStatement = new BoundStatement(statement);
-    if (startRow != null && (keyToken = fetchKeyTokenFromDB(startRow)) != null) {
-      startRowToken = keyToken;
-    }
+    tokenQuery = "select token(" + primaryKeyColumn + ") from " + store.keyspace + "." + tablename + " where " + primaryKeyColumn + " =  ?";
   }
 
   @Override
   public void activate(OperatorContext context)
   {
+    Long keyToken;
+    if (startRow != null) {
+      if ((keyToken = fetchKeyTokenFromDB(startRow)) != null) {
+        startRowToken = keyToken;
+      }
+    }
+
     com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace + "." + tablename + " LIMIT " + 1);
     ColumnDefinitions rsMetaData = rs.getColumnDefinitions();
 
-    primaryKeyColumnType = rsMetaData.getType(primaryKeyColumn);
     if (query.contains("%t")) {
       query = query.replace("%t", tablename);
     }
@@ -282,7 +282,6 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
   @SuppressWarnings("unchecked")
   public Object getTuple(Row row)
   {
-    lastRowInBatch = row;
     Object obj;
 
     try {
@@ -370,48 +369,12 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
     return query;
   }
 
-
-  /*
-   * Overriding emitTupes to save primarykey column value from last row in batch.
-   */
-  @Override
-  public void emitTuples()
-  {
-    super.emitTuples();
-    if (lastRowInBatch != null) {
-      startRowToken = getPrimaryKeyToken(primaryKeyColumnType.getName());
-    }
-  }
-
-  private Long getPrimaryKeyToken(DataType.Name primaryKeyDataType)
-  {
-    Object keyValue;
-    switch (primaryKeyDataType) {
-      case UUID:
-        keyValue = lastRowInBatch.getUUID(primaryKeyColumn);
-        break;
-      case INT:
-        keyValue = lastRowInBatch.getInt(primaryKeyColumn);
-        break;
-      case COUNTER:
-        keyValue = lastRowInBatch.getLong(primaryKeyColumn);
-        break;
-      case FLOAT:
-        keyValue = lastRowInBatch.getFloat(primaryKeyColumn);
-        break;
-      case DOUBLE:
-        keyValue = lastRowInBatch.getDouble(primaryKeyColumn);
-        break;
-      default:
-        throw new RuntimeException("unsupported data type " + primaryKeyColumnType.getName());
-    }
-    return fetchKeyTokenFromDB(keyValue);
-  }
-
   private Long fetchKeyTokenFromDB(Object keyValue)
   {
-    fetchKeyStatement.bind(keyValue);
-    ResultSet rs = store.getSession().execute(fetchKeyStatement);
+    PreparedStatement statement = store.getSession().prepare(tokenQuery);
+    BoundStatement boundStatement = new BoundStatement(statement);
+    boundStatement.bind(keyValue);
+    ResultSet rs = store.getSession().execute(boundStatement);
     Long keyTokenValue = rs.one().getLong(0);
     return keyTokenValue;
   }