You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/07/07 13:20:50 UTC

apex-malhar git commit: APEXMALHAR-2513 fixes for JdbcPollOperator

Repository: apex-malhar
Updated Branches:
  refs/heads/master b42d8e741 -> a37869e9a


APEXMALHAR-2513 fixes for JdbcPollOperator

Fix result set extraction to use column index.
Exit static partitions after work is done.
Propagate all exceptions from poller task.
Allow for override of DSLContext.
Add comment regarding SQL dialect support.
Fix poller example properties.
Documentation fixes.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/a37869e9
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/a37869e9
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/a37869e9

Branch: refs/heads/master
Commit: a37869e9a253226dcd9ba55014d6dbcd8d95a308
Parents: b42d8e7
Author: Thomas Weise <th...@apache.org>
Authored: Sun Jul 2 17:00:34 2017 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Thu Jul 6 10:28:01 2017 -0700

----------------------------------------------------------------------
 docs/operators/jdbcPollInputOperator.md         |  23 ++--
 .../META-INF/properties-PollJdbcToHDFSApp.xml   |  24 ++--
 .../db/jdbc/AbstractJdbcPollInputOperator.java  | 109 +++++++++++--------
 .../lib/db/jdbc/JdbcPollInputOperator.java      |   5 +-
 .../db/jdbc/JdbcPojoPollableOpeartorTest.java   |  11 +-
 5 files changed, 98 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a37869e9/docs/operators/jdbcPollInputOperator.md
----------------------------------------------------------------------
diff --git a/docs/operators/jdbcPollInputOperator.md b/docs/operators/jdbcPollInputOperator.md
index aa1d107..822ace5 100644
--- a/docs/operators/jdbcPollInputOperator.md
+++ b/docs/operators/jdbcPollInputOperator.md
@@ -26,10 +26,10 @@ JDBC Poller Input operator addresses the first issue with an asynchronous worker
 Assumption is that there is an ordered column using which range queries can be formed. That means database has a column or combination of columns which has unique constraint as well as every newly inserted record should have column value more than max value in that column, as we poll only appended records.
 
 ## Use cases
-1. Scan huge database tables to either copy to other database or process it using **Apache Apex**. An example application using this operator to copy database contents to HDFS is available in the [examples repository](https://github.com/DataTorrent/examples/tree/master/tutorials/jdbcIngest). Look for "PollJdbcToHDFSApp" for example of this particular operator.
+1. Ingest large database tables. An example application that copies database contents to HDFS is available [here](https://github.com/apache/apex-malhar/blob/master/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcIngest/JdbcPollerApplication.java).
 
 ## How to Use?
-The tuple type in the abstract class is a generic parameter. Concrete subclasses need to choose an appropriate class (such as String or an appropriate concrete java class, having no-argument constructor so that it can be serialized using kyro). Also implement a couple of abstract methods: `getTuple(ResultSet)` to convert database rows to objects of concrete class and `emitTuple(T)` to emit the tuple.
+The tuple type in the abstract class is a generic parameter. Concrete subclasses need to choose an appropriate class (such as String or an appropriate concrete java class, having no-argument constructor so that it can be serialized using Kryo). Also implement a couple of abstract methods: `getTuple(ResultSet)` to convert database rows to objects of concrete class and `emitTuple(T)` to emit the tuple.
 
 In principle, no ports need be defined in the rare case that the operator simply writes tuples to some external sink or merely maintains aggregated statistics. But in most common scenarios, the tuples need to be sent to one or more downstream operators for additional processing such as parsing, enrichment or aggregation; in such cases, appropriate output ports are defined and the emitTuple(T) implementation dispatches tuples to the desired output ports.
 
@@ -46,7 +46,7 @@ Only static partitioning is supported for JDBC Poller Input Operator. Configure
 
 ```xml
   <property>
-    <name>dt.operator.{OperatorName}.prop.partitionCount</name>
+    <name>apex.operator.{OperatorName}.prop.partitionCount</name>
     <value>4</value>
   </property>
 ```
@@ -62,8 +62,7 @@ Not supported.
 1. Operator location: ***malhar-library***
 2. Available since: ***3.5.0***
 3. Operator state: ***Evolving***
-4. Java Packages:
-    * Operator: ***[com.datatorrent.lib.db.jdbc.AbstractJdbcPollInputOperator](https://www.datatorrent.com/docs/apidocs/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.html)***
+4. Java Packages: ***[AbstractJdbcPollInputOperator](https://ci.apache.org/projects/apex-malhar/apex-malhar-javadoc-release-3.7/com/datatorrent/lib/db/jdbc/package-summary.html)***
 
 JDBC Poller is **idempotent**, **fault-tolerant** and **statically partitionable**.
 
@@ -99,27 +98,27 @@ Of these only `store` properties, `tableName`, `columnsExpression` and `key` are
 
 ```xml
 <property>
-  <name>dt.operator.{OperatorName}.prop.tableName</name>
+  <name>apex.operator.{OperatorName}.prop.tableName</name>
   <value>mytable</value>
 </property>
 <property>
-  <name>dt.operator.{OperatorName}.prop.columnsExpression</name>
+  <name>apex.operator.{OperatorName}.prop.columnsExpression</name>
   <value>column1,column2,column4</value>
 </property>
 <property>
-  <name>dt.operator.{OperatorName}.prop.key</name>
+  <name>apex.operator.{OperatorName}.prop.key</name>
   <value>keycolumn</value>
 </property>
 <property>
-  <name>dt.operator.{OperatorName}.prop.store.databaseDriver</name>
+  <name>apex.operator.{OperatorName}.prop.store.databaseDriver</name>
   <value>com.mysql.jdbc.Driver</value>
 </property>
 <property>
-  <name>dt.operator.{OperatorName}.prop.store.databaseUrl</name>
+  <name>apex.operator.{OperatorName}.prop.store.databaseUrl</name>
   <value>jdbc:mysql://localhost:3306/mydb</value>
 </property>
 <property>
-  <name>dt.operator.{OperatorName}.prop.store.connectionProps</name>
+  <name>apex.operator.{OperatorName}.prop.store.connectionProps</name>
   <value>user:myuser,password:mypassword</value>
 </property>
 ```
@@ -147,7 +146,7 @@ This operator defines following additional properties beyond those defined in th
 
 | **Property** | **Description** | **Type** | **Mandatory** | **Default Value** |
 | -------- | ----------- | ---- | ------------------ | ------------- |
-| *fieldInfos*| [FieldInfo](https://www.datatorrent.com/docs/apidocs/com/datatorrent/lib/util/FieldInfo.html) maps a store column to a POJO field name.| List | Yes | N/A |
+| *fieldInfos*| Maps columns to POJO field names.| List | Yes | N/A |
 
 #### Platform Attributes that influence operator behavior
 | **Attribute** | **Description** | **Type** | **Mandatory** |

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a37869e9/examples/jdbc/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml
----------------------------------------------------------------------
diff --git a/examples/jdbc/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml b/examples/jdbc/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml
index c75c7b6..e24c52e 100644
--- a/examples/jdbc/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml
+++ b/examples/jdbc/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml
@@ -23,68 +23,68 @@
     <!-- Static partitioning, specify the partition count, this decides how 
         many ranges would be initiated -->
     <property>
-        <name>dt.application.operator.JdbcPoller.prop.partitionCount</name>
+        <name>apex.operator.JdbcPoller.prop.partitionCount</name>
         <value>2</value>
     </property>
 
     <property>
-        <name>dt.application.operator.JdbcPoller.prop.store.databaseDriver</name>
+        <name>apex.operator.JdbcPoller.prop.store.databaseDriver</name>
         <!-- replace value with your jbdc driver -->
         <value>org.hsqldb.jdbcDriver</value>
     </property>
 
     <property>
-        <name>dt.application.operator.JdbcPoller.prop.store.databaseUrl</name>
+        <name>apex.operator.JdbcPoller.prop.store.databaseUrl</name>
         <!-- replace value with your jbdc  url -->
         <value>jdbc:hsqldb:mem:test</value>
     </property>
 
     <!--property>
-        <name>dt.application.operator.JdbcPoller.prop.store.userName</name>
+        <name>apex.operator.JdbcPoller.prop.store.connectionProperties(user)</name>
         <value>username</value>
     </property>
     
     <property>
-        <name>dt.application.operator.JdbcPoller.prop.store.password</name>
+        <name>apex.operator.JdbcPoller.prop.store.connectionProperties(password)</name>
         <value>password</value>
     </property-->
 
     <!-- Batch size for poller -->
     <property>
-        <name>dt.application.operator.JdbcPoller.prop.batchSize</name>
+        <name>apex.operator.JdbcPoller.prop.batchSize</name>
         <value>50</value>
     </property>
 
     <!-- look-up key for forming range queries, this would be the column name 
         on which the table is sorted -->
     <property>
-        <name>dt.application.operator.JdbcPoller.prop.key</name>
+        <name>apex.operator.JdbcPoller.prop.key</name>
         <value>ACCOUNT_NO</value>
     </property>
 
     <property>
-        <name>dt.application.operator.JdbcPoller.prop.columnsExpression</name>
+        <name>apex.operator.JdbcPoller.prop.columnsExpression</name>
         <value>ACCOUNT_NO,NAME,AMOUNT</value>
     </property>
     <property>
-      <name>dt.application.operator.JdbcPoller.port.outputPort.attr.TUPLE_CLASS</name>
+      <name>apex.operator.JdbcPoller.port.outputPort.attr.TUPLE_CLASS</name>
       <value>org.apache.apex.examples.JdbcIngest.PojoEvent</value>
     </property>
 
     <!-- Table name -->
     <property>
-        <name>dt.application.operator.JdbcPoller.prop.tableName</name>
+        <name>apex.operator.JdbcPoller.prop.tableName</name>
         <value>test_event_table</value>
     </property>
 
     <property>
-        <name>dt.application.operator.JdbcPoller.prop.pollInterval</name>
+        <name>apex.operator.JdbcPoller.prop.pollInterval</name>
         <value>1000</value>
     </property>
 
     <!-- Output folder for HDFS output operator -->
     <property>
-        <name>dt.application.operator.Writer.filePath</name>
+        <name>apex.operator.Writer.filePath</name>
         <value>/tmp/test/output</value>
     </property>
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a37869e9/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
index 86a443c..504f7fa 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
@@ -30,8 +30,8 @@ import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
 import javax.validation.constraints.Min;
 import javax.validation.constraints.NotNull;
@@ -39,7 +39,6 @@ import javax.validation.constraints.NotNull;
 import org.jooq.Condition;
 import org.jooq.DSLContext;
 import org.jooq.Field;
-import org.jooq.SelectField;
 import org.jooq.conf.ParamType;
 import org.jooq.impl.DSL;
 import org.jooq.tools.jdbc.JDBCUtils;
@@ -60,10 +59,10 @@ import com.datatorrent.api.Operator.ActivationListener;
 import com.datatorrent.api.Partitioner;
 import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.lib.db.AbstractStoreInputOperator;
 import com.datatorrent.lib.util.KeyValPair;
 import com.datatorrent.lib.util.KryoCloneUtils;
-import com.datatorrent.netlet.util.DTThrowable;
 
 import static java.sql.ResultSet.CONCUR_READ_ONLY;
 import static java.sql.ResultSet.TYPE_FORWARD_ONLY;
@@ -83,10 +82,14 @@ import static org.jooq.impl.DSL.field;
  * Only newly added data will be fetched by the polling jdbc partition, also
  * assumption is rows won't be added or deleted in middle during scan.
  *
+ * The operator uses jOOQ to build the SQL queries based on the discovered {@link org.jooq.SQLDialect}.
+ * Note that some of the dialects (including Oracle) are only available in commercial
+ * jOOQ distributions. If the dialect is not available, a generic translation is applied,
+ * you can post-process the generated SQL by overriding {@link #buildRangeQuery(int, int)}.
  *
  * @displayName Jdbc Polling Input Operator
  * @category Input
- * @tags database, sql, jdbc, partitionable, idepotent, pollable
+ * @tags database, sql, jdbc, partitionable, idempotent, pollable
  *
  * @since 3.5.0
  */
@@ -115,7 +118,6 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
 
   @NotNull
   private String tableName;
-  @NotNull
   private String columnsExpression;
   @NotNull
   private String key;
@@ -126,11 +128,10 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
   protected KeyValPair<Integer, Integer> rangeQueryPair;
   protected Integer lowerBound;
   protected Integer lastEmittedRow;
-  private transient int operatorId;
-  private transient DSLContext create;
+  protected transient DSLContext dslContext;
   private transient volatile boolean execute;
   private transient ScheduledExecutorService scanService;
-  private transient AtomicReference<Throwable> threadException;
+  private transient ScheduledFuture<?> pollFuture;
   protected transient boolean isPolled;
   protected transient LinkedBlockingDeque<T> emitQueue;
   protected transient PreparedStatement ps;
@@ -150,19 +151,18 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
   public void setup(OperatorContext context)
   {
     super.setup(context);
-    intializeDSLContext();
+    dslContext = createDSLContext();
     if (scanService == null) {
       scanService = Executors.newScheduledThreadPool(1);
     }
     execute = true;
     emitQueue = new LinkedBlockingDeque<>(queueCapacity);
-    operatorId = context.getId();
     windowManager.setup(context);
   }
 
-  private void intializeDSLContext()
+  protected DSLContext createDSLContext()
   {
-    create = DSL.using(store.getConnection(), JDBCUtils.dialect(store.getDatabaseUrl()));
+    return DSL.using(store.getConnection(), JDBCUtils.dialect(store.getDatabaseUrl()));
   }
 
   @Override
@@ -172,7 +172,17 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
     long largestRecoveryWindow = windowManager.getLargestCompletedWindow();
     if (largestRecoveryWindow == Stateless.WINDOW_ID
         || context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) > largestRecoveryWindow) {
-      scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS);
+      schedulePollTask();
+    }
+  }
+
+  private void schedulePollTask()
+  {
+    if (isPollerPartition) {
+      pollFuture = scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS);
+    } else {
+      LOG.debug("Scheduling for one time execution.");
+      pollFuture = scanService.schedule(new DBPoller(), 0, TimeUnit.MILLISECONDS);
     }
   }
 
@@ -203,8 +213,7 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
         replay(currentWindowId);
         return;
       } catch (SQLException e) {
-        LOG.error("Exception in replayed windows", e);
-        throw new RuntimeException(e);
+        throw new RuntimeException("Replay failed", e);
       }
     }
     if (isPollerPartition) {
@@ -219,6 +228,7 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
     if (currentWindowId <= windowManager.getLargestCompletedWindow()) {
       return;
     }
+
     int pollSize = (emitQueue.size() < batchSize) ? emitQueue.size() : batchSize;
     while (pollSize-- > 0) {
       T obj = emitQueue.poll();
@@ -234,6 +244,21 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
   @Override
   public void endWindow()
   {
+    if (pollFuture != null && (pollFuture.isCancelled() || pollFuture.isDone())) {
+      try {
+        pollFuture.get();
+      } catch (Exception e) {
+        throw new RuntimeException("JDBC thread failed", e);
+      }
+
+      if (isPollerPartition) {
+        throw new IllegalStateException("poller task terminated");
+      } else {
+        // exit static query partition
+        BaseOperator.shutdown();
+      }
+    }
+
     try {
       if (currentWindowId > windowManager.getLargestCompletedWindow()) {
         currentWindowRecoveryState = new MutablePair<>(lowerBound, lastEmittedRow);
@@ -242,22 +267,19 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
     } catch (IOException e) {
       throw new RuntimeException("saving recovery", e);
     }
-    if (threadException != null) {
-      store.disconnect();
-      DTThrowable.rethrow(threadException.get());
-    }
   }
 
   @Override
   public void deactivate()
   {
+    execute = false;
     scanService.shutdownNow();
     store.disconnect();
   }
 
   /**
-   * Function to insert results of a query in emit Queue
-   * @param preparedStatement PreparedStatement to execute the query and store the results in emit Queue.
+   * Execute the query and transfer results to the emit queue.
+   * @param preparedStatement PreparedStatement to execute the query and fetch results.
    */
   protected void insertDbDataInQueue(PreparedStatement preparedStatement) throws SQLException, InterruptedException
   {
@@ -293,14 +315,12 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
         insertDbDataInQueue(ps);
       }
       isPolled = true;
-    } catch (SQLException ex) {
-      execute = false;
-      threadException = new AtomicReference<Throwable>(ex);
-    } catch (InterruptedException e) {
-      threadException = new AtomicReference<Throwable>(e);
+    } catch (SQLException | InterruptedException ex) {
+      throw new RuntimeException(ex);
     } finally {
       if (!isPollerPartition) {
-        store.disconnect();
+        LOG.debug("fetched all records, marking complete.");
+        execute = false;
       }
     }
     isPolled = true;
@@ -310,7 +330,6 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
 
   protected void replay(long windowId) throws SQLException
   {
-
     try {
       @SuppressWarnings("unchecked")
       MutablePair<Integer, Integer> recoveredData = (MutablePair<Integer, Integer>)windowManager.retrieve(windowId);
@@ -318,14 +337,11 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
       if (recoveredData != null && shouldReplayWindow(recoveredData)) {
         LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", windowId, recoveredData.left,
             recoveredData.right);
-
         ps = store.getConnection().prepareStatement(
             buildRangeQuery(recoveredData.left, (recoveredData.right - recoveredData.left)), TYPE_FORWARD_ONLY,
             CONCUR_READ_ONLY);
         LOG.info("Query formed to recover data - {}", ps.toString());
-
         emitReplayedTuples(ps);
-
       }
 
       if (currentWindowId == windowManager.getLargestCompletedWindow()) {
@@ -347,7 +363,7 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
               lastOffset = bound;
             }
           }
-          scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS);
+          schedulePollTask();
         } catch (SQLException e) {
           throw new RuntimeException(e);
         }
@@ -400,13 +416,13 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
   public Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>> definePartitions(
       Collection<Partition<AbstractJdbcPollInputOperator<T>>> partitions, PartitioningContext context)
   {
-    List<Partition<AbstractJdbcPollInputOperator<T>>> newPartitions = new ArrayList<Partition<AbstractJdbcPollInputOperator<T>>>(
+    List<Partition<AbstractJdbcPollInputOperator<T>>> newPartitions = new ArrayList<>(
         getPartitionCount());
 
     HashMap<Integer, KeyValPair<Integer, Integer>> partitionToRangeMap = null;
     try {
       store.connect();
-      intializeDSLContext();
+      dslContext = createDSLContext();
       partitionToRangeMap = getPartitionedQueryRangeMap(getPartitionCount());
     } catch (SQLException e) {
       LOG.error("Exception in initializing the partition range", e);
@@ -427,11 +443,11 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
       } else {
         // The upper bound for the n+1 partition is set to null since its a pollable partition
         int partitionKey = partitionToRangeMap.get(i - 1).getValue();
-        jdbcPoller.rangeQueryPair = new KeyValPair<Integer, Integer>(partitionKey, null);
+        jdbcPoller.rangeQueryPair = new KeyValPair<>(partitionKey, null);
         jdbcPoller.lastEmittedRow = partitionKey;
         jdbcPoller.isPollerPartition = true;
       }
-      newPartitions.add(new DefaultPartition<AbstractJdbcPollInputOperator<T>>(jdbcPoller));
+      newPartitions.add(new DefaultPartition<>(jdbcPoller));
     }
 
     return newPartitions;
@@ -457,10 +473,10 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
     HashMap<Integer, KeyValPair<Integer, Integer>> partitionToQueryMap = new HashMap<>();
     int events = (rowCount / partitions);
     for (int i = 0, lowerOffset = 0, upperOffset = events; i < partitions - 1; i++, lowerOffset += events, upperOffset += events) {
-      partitionToQueryMap.put(i, new KeyValPair<Integer, Integer>(lowerOffset, upperOffset));
+      partitionToQueryMap.put(i, new KeyValPair<>(lowerOffset, upperOffset));
     }
 
-    partitionToQueryMap.put(partitions - 1, new KeyValPair<Integer, Integer>(events * (partitions - 1), (int)rowCount));
+    partitionToQueryMap.put(partitions - 1, new KeyValPair<>(events * (partitions - 1), rowCount));
     LOG.info("Partition map - " + partitionToQueryMap.toString());
     return partitionToQueryMap;
   }
@@ -476,7 +492,7 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
     if (getWhereCondition() != null) {
       condition = condition.and(getWhereCondition());
     }
-    int recordsCount = create.select(DSL.count()).from(getTableName()).where(condition).fetchOne(0, int.class);
+    int recordsCount = dslContext.select(DSL.count()).from(getTableName()).where(condition).fetchOne(0, int.class);
     return recordsCount;
   }
 
@@ -496,10 +512,10 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
       for (String column : getColumnsExpression().split(",")) {
         columns.add(field(column));
       }
-      sqlQuery = create.select((Collection<? extends SelectField<?>>)columns).from(getTableName()).where(condition)
+      sqlQuery = dslContext.select(columns).from(getTableName()).where(condition)
           .orderBy(field(getKey())).limit(limit).offset(offset).getSQL(ParamType.INLINED);
     } else {
-      sqlQuery = create.select().from(getTableName()).where(condition).orderBy(field(getKey())).limit(limit)
+      sqlQuery = dslContext.select().from(getTableName()).where(condition).orderBy(field(getKey())).limit(limit)
           .offset(offset).getSQL(ParamType.INLINED);
     }
     LOG.info("DSL Query: " + sqlQuery);
@@ -515,10 +531,15 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
     @Override
     public void run()
     {
-      while (execute) {
-        if ((isPollerPartition && !isPolled) || !isPollerPartition) {
-          pollRecords();
+      try {
+        LOG.debug("Entering poll task");
+        while (execute) {
+          if ((isPollerPartition && !isPolled) || !isPollerPartition) {
+            pollRecords();
+          }
         }
+      } finally {
+        LOG.debug("Exiting poll task");
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a37869e9/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java
index 9a76103..eb72431 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java
@@ -68,8 +68,9 @@ public class JdbcPollInputOperator extends AbstractJdbcPollInputOperator<String>
   {
     StringBuilder resultTuple = new StringBuilder();
     try {
-      for (String obj : emitColumns) {
-        resultTuple.append(rs.getObject(obj) + ",");
+      int columnCount = rs.getMetaData().getColumnCount();
+      for (int i = 0; i < columnCount; i++) {
+        resultTuple.append(rs.getObject(i + 1) + ",");
       }
       return resultTuple.substring(0, resultTuple.length() - 1); //remove last comma
     } catch (SQLException e) {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a37869e9/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
index bde22f5..c01804f 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
@@ -87,7 +87,7 @@ public class JdbcPojoPollableOpeartorTest extends JdbcOperatorTest
   }
 
   @Test
-  public void testDBPoller() throws InterruptedException
+  public void testDBPoller() throws Exception
   {
     insertEvents(10, true, 0);
 
@@ -179,7 +179,7 @@ public class JdbcPojoPollableOpeartorTest extends JdbcOperatorTest
   {
     int operatorId = 1;
     when(windowDataManagerMock.getLargestCompletedWindow()).thenReturn(1L);
-    when(windowDataManagerMock.retrieve(1)).thenReturn(new MutablePair<Integer, Integer>(0, 4));
+    when(windowDataManagerMock.retrieve(1)).thenReturn(new MutablePair<>(0, 4));
 
     insertEvents(10, true, 0);
 
@@ -207,7 +207,8 @@ public class JdbcPojoPollableOpeartorTest extends JdbcOperatorTest
     inputOperator.setFetchSize(100);
     inputOperator.setBatchSize(100);
     inputOperator.lastEmittedRow = 0; //setting as not calling partition logic
-    inputOperator.rangeQueryPair = new KeyValPair<Integer, Integer>(0, 8);
+    inputOperator.isPollerPartition = true;
+    inputOperator.rangeQueryPair = new KeyValPair<>(0, 8);
 
     inputOperator.outputPort.setup(tpc);
     inputOperator.setScheduledExecutorService(mockscheduler);
@@ -219,15 +220,17 @@ public class JdbcPojoPollableOpeartorTest extends JdbcOperatorTest
     inputOperator.outputPort.setSink(sink);
     inputOperator.beginWindow(0);
     verify(mockscheduler, times(0)).scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
+    verify(mockscheduler, times(0)).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
     inputOperator.emitTuples();
     inputOperator.endWindow();
     inputOperator.beginWindow(1);
     verify(mockscheduler, times(1)).scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
+    verify(mockscheduler, times(0)).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
 
   }
 
   @Test
-  public void testDBPollerExtraField() throws InterruptedException
+  public void testDBPollerExtraField() throws Exception
   {
     insertEvents(10, true, 0);