You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by bh...@apache.org on 2016/05/25 20:25:28 UTC

[1/4] incubator-apex-malhar git commit: Renaming a variable

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master 9c11400a2 -> dc36b9896


Renaming a variable


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/802cfce2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/802cfce2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/802cfce2

Branch: refs/heads/master
Commit: 802cfce223a6296b2a3a9d61a89d4461b7b96fed
Parents: d094a04
Author: Priyanka Gugale <pr...@datatorrent.com>
Authored: Thu May 12 16:59:29 2016 -0700
Committer: Priyanka Gugale <pr...@datatorrent.com>
Committed: Tue May 17 14:31:14 2016 -0700

----------------------------------------------------------------------
 .../contrib/cassandra/CassandraPOJOInputOperator.java          | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/802cfce2/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
index b7301cb..9c56178 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
@@ -61,7 +61,7 @@ import com.datatorrent.lib.util.PojoUtils.*;
 @Evolving
 public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<Object> implements Operator.ActivationListener<OperatorContext>
 {
-  private String TOKEN_QUERY;
+  private String tokenQuery;
   @NotNull
   private List<FieldInfo> fieldInfos;
   private Number startRow;
@@ -196,7 +196,7 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
   public void setup(OperatorContext context)
   {
     super.setup(context);
-    TOKEN_QUERY = "select token(" + primaryKeyColumn + ") from " + store.keyspace + "." + tablename + " where " + primaryKeyColumn + " =  ?";
+    tokenQuery = "select token(" + primaryKeyColumn + ") from " + store.keyspace + "." + tablename + " where " + primaryKeyColumn + " =  ?";
   }
 
   @Override
@@ -371,7 +371,7 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
 
   private Long fetchKeyTokenFromDB(Object keyValue)
   {
-    PreparedStatement statement = store.getSession().prepare(TOKEN_QUERY);
+    PreparedStatement statement = store.getSession().prepare(tokenQuery);
     BoundStatement boundStatement = new BoundStatement(statement);
     boundStatement.bind(keyValue);
     ResultSet rs = store.getSession().execute(boundStatement);


[2/4] incubator-apex-malhar git commit: APEXMALHAR-1988: Updating cassandra batch fetch logic to use Cassandra Paging feature

Posted by bh...@apache.org.
APEXMALHAR-1988: Updating cassandra batch fetch logic to use Cassandra Paging feature


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/d094a04f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/d094a04f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/d094a04f

Branch: refs/heads/master
Commit: d094a04fcbb3cea459610e55dae30ef2c86179a7
Parents: 4b126aa
Author: Priyanka Gugale <pr...@datatorrent.com>
Authored: Wed Feb 10 14:45:03 2016 +0530
Committer: Priyanka Gugale <pr...@datatorrent.com>
Committed: Tue May 17 14:31:14 2016 -0700

----------------------------------------------------------------------
 .../AbstractCassandraInputOperator.java         | 36 ++++++++-
 .../cassandra/CassandraPOJOInputOperator.java   | 77 +++++---------------
 2 files changed, 52 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d094a04f/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
index 366d97a..0f2f0d0 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
@@ -19,8 +19,10 @@
 package com.datatorrent.contrib.cassandra;
 
 
+import com.datastax.driver.core.PagingState;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
+import com.datastax.driver.core.SimpleStatement;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,7 +47,8 @@ import com.datatorrent.netlet.util.DTThrowable;
 public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInputOperator<T, CassandraStore> {
 
   private static final Logger logger = LoggerFactory.getLogger(AbstractCassandraInputOperator.class);
-
+  private PagingState nextPageState;
+  private int fetchSize;
   int waitForDataTimeout = 100;
   @AutoMetric
   protected long tuplesRead;
@@ -112,8 +115,15 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp
     String query = queryToRetrieveData();
     logger.debug("select statement: {}", query);
 
+    SimpleStatement stmt = new SimpleStatement(query);
+    stmt.setFetchSize(fetchSize);
     try {
-      ResultSet result = store.getSession().execute(query);
+      if (nextPageState != null) {
+        stmt.setPagingState(nextPageState);
+      }
+      ResultSet result = store.getSession().execute(stmt);
+      nextPageState = result.getExecutionInfo().getPagingState();
+
       if (!result.isExhausted()) {
         for (Row row : result) {
           T tuple = getTuple(row);
@@ -124,8 +134,7 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp
         // No rows available wait for some time before retrying so as to not continuously slam the database
         Thread.sleep(waitForDataTimeout);
       }
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       store.disconnect();
       DTThrowable.rethrow(ex);
     }
@@ -135,4 +144,23 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp
   {
     outputPort.emit(tuple);
   }
+
+  /**
+   * Get page fetch Size
+   * @return
+   */
+  public int getFetchSize()
+  {
+    return fetchSize;
+  }
+
+  /**
+   * Set page fetch size
+   * @param fetchSize
+   */
+  public void setFetchSize(int fetchSize)
+  {
+    this.fetchSize = fetchSize;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d094a04f/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
index 3d87711..b7301cb 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
@@ -61,6 +61,7 @@ import com.datatorrent.lib.util.PojoUtils.*;
 @Evolving
 public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<Object> implements Operator.ActivationListener<OperatorContext>
 {
+  private String TOKEN_QUERY;
   @NotNull
   private List<FieldInfo> fieldInfos;
   private Number startRow;
@@ -71,15 +72,9 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
   private String query;
   @NotNull
   private String primaryKeyColumn;
-
   @Min(1)
   private int limit = 10;
 
-  private String TOKEN_QUERY;
-  private transient DataType primaryKeyColumnType;
-  private transient Row lastRowInBatch;
-  private transient BoundStatement fetchKeyStatement;
-
   protected final transient List<Object> setters;
   protected final transient List<DataType> columnDataTypes;
   protected transient Class<?> pojoClass;
@@ -94,14 +89,19 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
     }
   };
 
-  /*
-   * Number of records to be fetched in one time from cassandra table.
+  /**
+   * Gets number of records to be fetched at one time from cassandra table.
+   * @return limit
    */
   public int getLimit()
   {
     return limit;
   }
 
+  /**
+   * Sets number of records to be fetched at one time from cassandra table.
+   * @param limit
+   */
   public void setLimit(int limit)
   {
     this.limit = limit;
@@ -138,7 +138,7 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
   /*
    * Parameterized query with parameters such as %t for table name , %p for primary key, %s for start value and %l for limit.
    * Example of retrieveQuery:
-   * select * from %t where token(%p) > %s limit %l;
+   * select * from %t where token(%p) > %s LIMIT %l;
    */
   public String getQuery()
   {
@@ -196,22 +196,22 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
   public void setup(OperatorContext context)
   {
     super.setup(context);
-    Long keyToken;
     TOKEN_QUERY = "select token(" + primaryKeyColumn + ") from " + store.keyspace + "." + tablename + " where " + primaryKeyColumn + " =  ?";
-    PreparedStatement statement = store.getSession().prepare(TOKEN_QUERY);
-    fetchKeyStatement = new BoundStatement(statement);
-    if (startRow != null && (keyToken = fetchKeyTokenFromDB(startRow)) != null) {
-      startRowToken = keyToken;
-    }
   }
 
   @Override
   public void activate(OperatorContext context)
   {
+    Long keyToken;
+    if (startRow != null) {
+      if ((keyToken = fetchKeyTokenFromDB(startRow)) != null) {
+        startRowToken = keyToken;
+      }
+    }
+
     com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace + "." + tablename + " LIMIT " + 1);
     ColumnDefinitions rsMetaData = rs.getColumnDefinitions();
 
-    primaryKeyColumnType = rsMetaData.getType(primaryKeyColumn);
     if (query.contains("%t")) {
       query = query.replace("%t", tablename);
     }
@@ -282,7 +282,6 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
   @SuppressWarnings("unchecked")
   public Object getTuple(Row row)
   {
-    lastRowInBatch = row;
     Object obj;
 
     try {
@@ -370,48 +369,12 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
     return query;
   }
 
-
-  /*
-   * Overriding emitTupes to save primarykey column value from last row in batch.
-   */
-  @Override
-  public void emitTuples()
-  {
-    super.emitTuples();
-    if (lastRowInBatch != null) {
-      startRowToken = getPrimaryKeyToken(primaryKeyColumnType.getName());
-    }
-  }
-
-  private Long getPrimaryKeyToken(DataType.Name primaryKeyDataType)
-  {
-    Object keyValue;
-    switch (primaryKeyDataType) {
-      case UUID:
-        keyValue = lastRowInBatch.getUUID(primaryKeyColumn);
-        break;
-      case INT:
-        keyValue = lastRowInBatch.getInt(primaryKeyColumn);
-        break;
-      case COUNTER:
-        keyValue = lastRowInBatch.getLong(primaryKeyColumn);
-        break;
-      case FLOAT:
-        keyValue = lastRowInBatch.getFloat(primaryKeyColumn);
-        break;
-      case DOUBLE:
-        keyValue = lastRowInBatch.getDouble(primaryKeyColumn);
-        break;
-      default:
-        throw new RuntimeException("unsupported data type " + primaryKeyColumnType.getName());
-    }
-    return fetchKeyTokenFromDB(keyValue);
-  }
-
   private Long fetchKeyTokenFromDB(Object keyValue)
   {
-    fetchKeyStatement.bind(keyValue);
-    ResultSet rs = store.getSession().execute(fetchKeyStatement);
+    PreparedStatement statement = store.getSession().prepare(TOKEN_QUERY);
+    BoundStatement boundStatement = new BoundStatement(statement);
+    boundStatement.bind(keyValue);
+    ResultSet rs = store.getSession().execute(boundStatement);
     Long keyTokenValue = rs.one().getLong(0);
     return keyTokenValue;
   }


[4/4] incubator-apex-malhar git commit: Merge branch 'APEXMALHAR-1988-cassandra-input' of https://github.com/DT-Priyanka/incubator-apex-malhar

Posted by bh...@apache.org.
Merge branch 'APEXMALHAR-1988-cassandra-input' of https://github.com/DT-Priyanka/incubator-apex-malhar


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/dc36b989
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/dc36b989
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/dc36b989

Branch: refs/heads/master
Commit: dc36b98962a1b0a2138c271809699f880dfe72c6
Parents: 9c11400 b047b82
Author: bhupesh <bh...@gmail.com>
Authored: Wed May 25 11:30:12 2016 -0700
Committer: bhupesh <bh...@gmail.com>
Committed: Wed May 25 11:30:12 2016 -0700

----------------------------------------------------------------------
 .../AbstractCassandraInputOperator.java         | 41 +++++++---
 .../cassandra/CassandraPOJOInputOperator.java   | 79 ++++++--------------
 2 files changed, 53 insertions(+), 67 deletions(-)
----------------------------------------------------------------------



[3/4] incubator-apex-malhar git commit: Removing autometrics

Posted by bh...@apache.org.
Removing autometrics


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/b047b825
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/b047b825
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/b047b825

Branch: refs/heads/master
Commit: b047b82540b0526247e8a8bd925b66d1f38bffbe
Parents: 802cfce
Author: Priyanka Gugale <pr...@datatorrent.com>
Authored: Tue May 17 14:33:41 2016 -0700
Committer: Priyanka Gugale <pr...@datatorrent.com>
Committed: Tue May 17 14:33:41 2016 -0700

----------------------------------------------------------------------
 .../contrib/cassandra/AbstractCassandraInputOperator.java       | 5 -----
 1 file changed, 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b047b825/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
index 0f2f0d0..7bd47fc 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.datatorrent.lib.db.AbstractStoreInputOperator;
-import com.datatorrent.api.AutoMetric;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.netlet.util.DTThrowable;
 
@@ -50,14 +49,11 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp
   private PagingState nextPageState;
   private int fetchSize;
   int waitForDataTimeout = 100;
-  @AutoMetric
-  protected long tuplesRead;
 
   @Override
   public void beginWindow(long l)
   {
     super.beginWindow(l);
-    tuplesRead = 0;
   }
 
   /**
@@ -128,7 +124,6 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp
         for (Row row : result) {
           T tuple = getTuple(row);
           emit(tuple);
-          tuplesRead++;
         }
       } else {
         // No rows available wait for some time before retrying so as to not continuously slam the database