You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by bh...@apache.org on 2016/08/11 11:37:22 UTC

[2/2] apex-malhar git commit: APEXMALHAR-2172: Updates to JDBC Poll Input Operator

APEXMALHAR-2172: Updates to JDBC Poll Input Operator


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/26fa9d78
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/26fa9d78
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/26fa9d78

Branch: refs/heads/master
Commit: 26fa9d781e032fd61fbb2972a99874e77e0c509c
Parents: 3316d6a
Author: Priyanka Gugale <pr...@datatorrent.com>
Authored: Tue Jul 19 12:10:08 2016 +0530
Committer: Priyanka Gugale <pr...@datatorrent.com>
Committed: Thu Aug 11 15:56:42 2016 +0530

----------------------------------------------------------------------
 library/pom.xml                                 |   5 +
 .../db/jdbc/AbstractJdbcPollInputOperator.java  | 915 ++++++++++---------
 .../lib/db/jdbc/JdbcMetaDataUtility.java        | 353 -------
 .../lib/db/jdbc/JdbcPOJOPollInputOperator.java  | 325 +++++++
 .../lib/db/jdbc/JdbcPollInputOperator.java      | 186 +---
 .../lib/db/jdbc/JdbcOperatorTest.java           | 627 +------------
 .../lib/db/jdbc/JdbcPojoOperatorTest.java       | 606 ++++++++++++
 .../db/jdbc/JdbcPojoPollableOpeartorTest.java   | 241 +++++
 .../datatorrent/lib/db/jdbc/JdbcPollerTest.java | 246 -----
 9 files changed, 1707 insertions(+), 1797 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/26fa9d78/library/pom.xml
----------------------------------------------------------------------
diff --git a/library/pom.xml b/library/pom.xml
index b8c6271..a6bdf64 100644
--- a/library/pom.xml
+++ b/library/pom.xml
@@ -341,6 +341,11 @@
       <version>7.0.6</version>
     </dependency>
     <dependency>
+      <groupId>org.jooq</groupId>
+      <artifactId>jooq</artifactId>
+      <version>3.6.4</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.apex</groupId>
       <artifactId>apex-shaded-ning19</artifactId>
       <version>1.0.0</version>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/26fa9d78/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
index ab12ed3..234e28c 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
@@ -27,11 +27,23 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.validation.constraints.Min;
-
+import javax.validation.constraints.NotNull;
+
+import org.jooq.Condition;
+import org.jooq.DSLContext;
+import org.jooq.Field;
+import org.jooq.SelectField;
+import org.jooq.conf.ParamType;
+import org.jooq.impl.DSL;
+import org.jooq.tools.jdbc.JDBCUtils;
+import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
@@ -39,242 +51,86 @@ import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import com.datatorrent.api.Context;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultPartition;
 import com.datatorrent.api.Operator.ActivationListener;
-import com.datatorrent.api.Operator.IdleTimeHandler;
 import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.lib.db.AbstractStoreInputOperator;
 import com.datatorrent.lib.util.KeyValPair;
 import com.datatorrent.lib.util.KryoCloneUtils;
 import com.datatorrent.netlet.util.DTThrowable;
 
+import static java.sql.ResultSet.CONCUR_READ_ONLY;
+import static java.sql.ResultSet.TYPE_FORWARD_ONLY;
+import static org.jooq.impl.DSL.field;
+
 /**
- * Abstract operator for for consuming data using JDBC interface<br>
- * User needs User needs to provide
- * tableName,dbConnection,setEmitColumnList,look-up key <br>
- * Optionally batchSize,pollInterval,Look-up key and a where clause can be given
- * <br>
- * This operator uses static partitioning to arrive at range queries for exactly
- * once reads<br>
+ * Abstract operator for consuming data using JDBC interface<br>
+ * User needs to provide tableName, dbConnection, columnsExpression, look-up key<br>
+ * Optionally batchSize, pollInterval and a where clause can be given <br>
+ * This operator uses static partitioning to arrive at range queries to
+ * idempotent reads<br>
  * This operator will create a configured number of non-polling static
  * partitions for fetching the existing data in the table. And an additional
  * single partition for polling additive data. Assumption is that there is an
- * ordered column using which range queries can be formed<br>
- * If an emitColumnList is provided, please ensure that the keyColumn is the
- * first column in the list<br>
- * Range queries are formed using the {@link JdbcMetaDataUtility}} Output -
- * comma separated list of the emit columns eg columnA,columnB,columnC<br>
- * Only newly added data which has increasing ids will be fetched by the polling
- * jdbc partition
+ * ordered unique column using which range queries can be formed<br>
  * 
- * In the next iterations this operator would support an in-clause for
- * idempotency instead of having only range query support to support non ordered
- * key columns
+ * Only newly added data will be fetched by the polling jdbc partition, also
+ * assumption is rows won't be added or deleted in middle during scan.
  * 
  * 
  * @displayName Jdbc Polling Input Operator
  * @category Input
- * @tags database, sql, jdbc, partitionable,exactlyOnce
+ * @tags database, sql, jdbc, partitionable, idepotent, pollable
  */
 @Evolving
-public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInputOperator<T, JdbcStore>
-    implements ActivationListener<Context>, IdleTimeHandler, Partitioner<AbstractJdbcPollInputOperator<T>>
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInputOperator<T, JdbcStore> implements
+    ActivationListener<OperatorContext>, Partitioner<AbstractJdbcPollInputOperator<T>>
 {
-  /**
-   * poll interval in milliseconds
-   */
-  private static int pollInterval = 10000;
+  private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024;
+  private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
+  private static int DEFAULT_FETCH_SIZE = 20000;
+  private static int DEFAULT_BATCH_SIZE = 2000;
+  private static int DEFAULT_SLEEP_TIME = 100;
+  private int pollInterval = DEFAULT_POLL_INTERVAL; //in miliseconds
+  private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
+  private int fetchSize = DEFAULT_FETCH_SIZE;
 
   @Min(1)
   private int partitionCount = 1;
-  protected transient int operatorId;
-  protected transient boolean isReplayed;
-  protected transient boolean isPollable;
-  protected int batchSize;
-  protected static int fetchSize = 20000;
-  /**
-   * Map of windowId to <lower bound,upper bound> of the range key
-   */
-  protected transient MutablePair<String, String> currentWindowRecoveryState;
-
-  /**
-   * size of the emit queue used to hold polled records before emit
-   */
-  private static int queueCapacity = 4 * 1024 * 1024;
+  private int batchSize = DEFAULT_BATCH_SIZE;
+
+  @NotNull
+  private String tableName;
+  @NotNull
+  private String columnsExpression;
+  @NotNull
+  private String key;
+  private String whereCondition = null;
+  private long currentWindowId;
+  private WindowDataManager windowManager;
+
+  protected KeyValPair<Integer, Integer> rangeQueryPair;
+  protected Integer lowerBound;
+  protected Integer lastEmittedRow;
+  private transient int operatorId;
+  private transient DSLContext create;
   private transient volatile boolean execute;
-  private transient AtomicReference<Throwable> cause;
-  protected transient int spinMillis;
-  private transient OperatorContext context;
-  protected String tableName;
-  protected String key;
-  protected long currentWindowId;
-  protected KeyValPair<String, String> rangeQueryPair;
-  protected String lower;
-  protected String upper;
-  protected boolean recovered;
-  protected boolean isPolled;
-  protected String whereCondition = null;
-  protected String previousUpperBound;
-  protected String highestPolled;
-  private static final String user = "";
-  private static final String password = "";
-  /**
-   * thread to poll database
-   */
-  private transient Thread dbPoller;
-  protected transient ArrayBlockingQueue<List<T>> emitQueue;
+  private transient ScheduledExecutorService scanService;
+  private transient AtomicReference<Throwable> threadException;
+  protected transient boolean isPolled;
+  protected transient Integer lastPolledRow;
+  protected transient LinkedBlockingDeque<T> emitQueue;
   protected transient PreparedStatement ps;
-  protected WindowDataManager windowManager;
-  private String emitColumnList;
-
-  /**
-   * Returns the where clause
-   */
-  public String getWhereCondition()
-  {
-    return whereCondition;
-  }
-
-  /**
-   * Sets the where clause
-   */
-  public void setWhereCondition(String whereCondition)
-  {
-    this.whereCondition = whereCondition;
-  }
-
-  /**
-   * Returns the list of columns to select from the query
-   */
-  public String getEmitColumnList()
-  {
-    return emitColumnList;
-  }
-
-  /**
-   * Comma separated list of columns to select from the given table
-   */
-  public void setEmitColumnList(String emitColumnList)
-  {
-    this.emitColumnList = emitColumnList;
-  }
-
-  /**
-   * Returns the fetchsize for getting the results
-   */
-  public int getFetchSize()
-  {
-    return fetchSize;
-  }
-
-  /**
-   * Sets the fetchsize for getting the results
-   */
-  public void setFetchSize(int fetchSize)
-  {
-    this.fetchSize = fetchSize;
-  }
-
-  protected abstract void pollRecords(PreparedStatement ps);
-
-  /**
-   * Returns the interval for polling the queue
-   */
-  public int getPollInterval()
-  {
-    return pollInterval;
-  }
-
-  /**
-   * Sets the interval for polling the emit queue
-   */
-  public void setPollInterval(int pollInterval)
-  {
-    this.pollInterval = pollInterval;
-  }
-
-  /**
-   * Returns the capacity of the emit queue
-   */
-  public int getQueueCapacity()
-  {
-    return queueCapacity;
-  }
-
-  /**
-   * Sets the capacity of the emit queue
-   */
-  public void setQueueCapacity(int queueCapacity)
-  {
-    this.queueCapacity = queueCapacity;
-  }
-
-  /**
-   * Returns the ordered key used to generate the range queries
-   */
-  public String getKey()
-  {
-    return key;
-  }
-
-  /**
-   * Sets the ordered key used to generate the range queries
-   */
-  public void setKey(String key)
-  {
-    this.key = key;
-  }
+  protected boolean isPollerPartition;
 
-  /**
-   * Returns the tableName which would be queried
-   */
-  public String getTableName()
-  {
-    return tableName;
-  }
-
-  /**
-   * Sets the tableName to query
-   */
-  public void setTableName(String tableName)
-  {
-    this.tableName = tableName;
-  }
-
-  /**
-   * Returns rangeQueryPair - <lowerBound,upperBound>
-   */
-  public KeyValPair<String, String> getRangeQueryPair()
-  {
-    return rangeQueryPair;
-  }
-
-  /**
-   * Sets the rangeQueryPair <lowerBound,upperBound>
-   */
-  public void setRangeQueryPair(KeyValPair<String, String> rangeQueryPair)
-  {
-    this.rangeQueryPair = rangeQueryPair;
-  }
-
-  /**
-   * Returns batchSize indicating the number of elements in emitQueue
-   */
-  public int getBatchSize()
-  {
-    return batchSize;
-  }
-
-  /**
-   * Sets batchSize for number of elements in the emitQueue
-   */
-  public void setBatchSize(int batchSize)
-  {
-    this.batchSize = batchSize;
-  }
+  protected transient MutablePair<Integer, Integer> currentWindowRecoveryState;
 
   public AbstractJdbcPollInputOperator()
   {
@@ -286,271 +142,234 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
   public void setup(OperatorContext context)
   {
     super.setup(context);
-    spinMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
+    intializeDSLContext();
+    if (scanService == null) {
+      scanService = Executors.newScheduledThreadPool(1);
+    }
     execute = true;
-    cause = new AtomicReference<Throwable>();
-    emitQueue = new ArrayBlockingQueue<List<T>>(queueCapacity);
-    this.context = context;
+    emitQueue = new LinkedBlockingDeque<>(queueCapacity);
     operatorId = context.getId();
+    windowManager.setup(context);
+  }
 
-    try {
+  private void intializeDSLContext()
+  {
+    create = DSL.using(store.getConnection(), JDBCUtils.dialect(store.getDatabaseUrl()));
+  }
+
+  @Override
+  public void activate(OperatorContext context)
+  {
+    initializePreparedStatement();
+    long largestRecoveryWindow = windowManager.getLargestRecoveryWindow();
+    if (largestRecoveryWindow == Stateless.WINDOW_ID
+        || context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) > largestRecoveryWindow) {
+      scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS);
+    }
+  }
 
-      //If its a range query pass upper and lower bounds
-      //If its a polling query pass only the lower bound
-      if (getRangeQueryPair().getValue() != null) {
-        ps = store.getConnection()
-            .prepareStatement(
-                JdbcMetaDataUtility.buildRangeQuery(getTableName(), getKey(), rangeQueryPair.getKey(),
-                    rangeQueryPair.getValue()),
-                java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
+  protected void initializePreparedStatement()
+  {
+    try {
+      // If its a range query pass upper and lower bounds, If its a polling query pass only the lower bound
+      if (isPollerPartition) {
+        ps = store.getConnection().prepareStatement(buildRangeQuery(rangeQueryPair.getKey(), Integer.MAX_VALUE),
+            TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
       } else {
         ps = store.getConnection().prepareStatement(
-            JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), rangeQueryPair.getKey()),
-            java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
-        isPollable = true;
+            buildRangeQuery(rangeQueryPair.getKey(), (rangeQueryPair.getValue() - rangeQueryPair.getKey())),
+            TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
       }
-
     } catch (SQLException e) {
       LOG.error("Exception in initializing the range query for a given partition", e);
       throw new RuntimeException(e);
     }
 
-    windowManager.setup(context);
-    LOG.debug("super setup done...");
   }
 
   @Override
   public void beginWindow(long windowId)
   {
     currentWindowId = windowId;
-
-    isReplayed = false;
-
     if (currentWindowId <= windowManager.getLargestRecoveryWindow()) {
       try {
         replay(currentWindowId);
+        return;
       } catch (SQLException e) {
         LOG.error("Exception in replayed windows", e);
         throw new RuntimeException(e);
       }
     }
-
-    if (isReplayed && currentWindowId == windowManager.getLargestRecoveryWindow()) {
-      try {
-        if (!isPollable && rangeQueryPair.getValue() != null) {
-
-          ps = store.getConnection().prepareStatement(
-              JdbcMetaDataUtility.buildGTRangeQuery(getTableName(), getKey(), previousUpperBound,
-                  rangeQueryPair.getValue()),
-              java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
-        } else {
-          String bound = null;
-          if (previousUpperBound == null) {
-            bound = getRangeQueryPair().getKey();
-          } else {
-            bound = previousUpperBound;
-          }
-          ps = store.getConnection().prepareStatement(
-              JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), bound),
-              java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
-          isPollable = true;
-        }
-        isReplayed = false;
-        LOG.debug("Prepared statement after re-initialization - {} ", ps.toString());
-      } catch (SQLException e) {
-        // TODO Auto-generated catch block
-        throw new RuntimeException(e);
-      }
+    if (isPollerPartition) {
+      updatePollQuery();
+      isPolled = false;
     }
+    lowerBound = lastEmittedRow;
+  }
 
-    //Reset the pollable query with the updated upper and lower bounds
-    if (isPollable) {
+  private void updatePollQuery()
+  {
+    if ((lastPolledRow != lastEmittedRow)) {
+      if (lastEmittedRow == null) {
+        lastPolledRow = rangeQueryPair.getKey();
+      } else {
+        lastPolledRow = lastEmittedRow;
+      }
       try {
-        String bound = null;
-        if (previousUpperBound == null && highestPolled == null) {
-          bound = getRangeQueryPair().getKey();
-        } else {
-          bound = highestPolled;
-        }
-        ps = store.getConnection().prepareStatement(
-            JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), bound),
-            java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
-        LOG.debug("Polling query {} {}", ps.toString(), currentWindowId);
-        isPolled = false;
+        ps = store.getConnection().prepareStatement(buildRangeQuery(lastPolledRow, Integer.MAX_VALUE),
+            TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
       } catch (SQLException e) {
         throw new RuntimeException(e);
       }
     }
 
-    lower = null;
-    upper = null;
-
-    //Check if a thread is already active and start only if its no
-    //Do not start the thread from setup, will conflict with the replay
-    if (dbPoller == null && !isReplayed) {
-      //If this is not a replayed state, reset the ps to highest read offset + 1, 
-      //keep the upper bound as the one that was initialized after static partitioning
-      LOG.info("Statement when re-initialized {}", ps.toString());
-      dbPoller = new Thread(new DBPoller());
-      dbPoller.start();
-    }
   }
 
   @Override
   public void emitTuples()
   {
-    if (isReplayed) {
+    if (currentWindowId <= windowManager.getLargestRecoveryWindow()) {
       return;
     }
-
-    List<T> tuples;
-
-    if ((tuples = emitQueue.poll()) != null) {
-      for (Object tuple : tuples) {
-        if (lower == null) {
-          lower = tuple.toString();
-        }
-        upper = tuple.toString();
-        outputPort.emit((T)tuple);
+    int pollSize = (emitQueue.size() < batchSize) ? emitQueue.size() : batchSize;
+    while (pollSize-- > 0) {
+      T obj = emitQueue.poll();
+      if (obj != null) {
+        emitTuple(obj);
       }
+      lastEmittedRow++;
     }
   }
 
+  protected abstract void emitTuple(T tuple);
+
   @Override
   public void endWindow()
   {
     try {
       if (currentWindowId > windowManager.getLargestRecoveryWindow()) {
-        if (currentWindowRecoveryState == null) {
-          currentWindowRecoveryState = new MutablePair<String, String>();
-        }
-        if (lower != null && upper != null) {
-          previousUpperBound = upper;
-          isPolled = true;
-        }
-        MutablePair<String, String> windowBoundaryPair = new MutablePair<>(lower, upper);
-        currentWindowRecoveryState = windowBoundaryPair;
+        currentWindowRecoveryState = new MutablePair<>(lowerBound, lastEmittedRow);
         windowManager.save(currentWindowRecoveryState, operatorId, currentWindowId);
       }
     } catch (IOException e) {
       throw new RuntimeException("saving recovery", e);
     }
-    currentWindowRecoveryState = new MutablePair<>();
-  }
-
-  public int getPartitionCount()
-  {
-    return partitionCount;
+    if (threadException != null) {
+      store.disconnect();
+      DTThrowable.rethrow(threadException.get());
+    }
   }
 
-  public void setPartitionCount(int partitionCount)
+  @Override
+  public void deactivate()
   {
-    this.partitionCount = partitionCount;
+    scanService.shutdownNow();
+    store.disconnect();
   }
 
-  @Override
-  public void activate(Context cntxt)
+  protected void pollRecords()
   {
-    if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID
-        && context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowManager.getLargestRecoveryWindow()) {
-      // If it is a replay state, don't start any threads here
+    if (isPolled) {
       return;
     }
-  }
-
-  @Override
-  public void deactivate()
-  {
     try {
-      if (dbPoller != null && dbPoller.isAlive()) {
-        dbPoller.interrupt();
-        dbPoller.join();
+      ps.setFetchSize(getFetchSize());
+      ResultSet result = ps.executeQuery();
+      if (result.next()) {
+        do {
+          while (!emitQueue.offer(getTuple(result))) {
+            Thread.sleep(DEFAULT_SLEEP_TIME);
+          }
+        } while (result.next());
       }
-    } catch (InterruptedException ex) {
-      // log and ignore, ending execution anyway
-      LOG.error("exception in poller thread: ", ex);
-    }
-  }
-
-  @Override
-  public void handleIdleTime()
-  {
-    if (execute) {
-      try {
-        Thread.sleep(spinMillis);
-      } catch (InterruptedException ie) {
-        throw new RuntimeException(ie);
+      isPolled = true;
+    } catch (SQLException ex) {
+      execute = false;
+      threadException = new AtomicReference<Throwable>(ex);
+    } catch (InterruptedException e) {
+      threadException = new AtomicReference<Throwable>(e);
+    } finally {
+      if (!isPollerPartition) {
+        store.disconnect();
       }
-    } else {
-      LOG.error("Exception: ", cause);
-      DTThrowable.rethrow(cause.get());
     }
   }
 
+  public abstract T getTuple(ResultSet result);
+
   protected void replay(long windowId) throws SQLException
   {
-    isReplayed = true;
 
-    MutablePair<String, String> recoveredData = new MutablePair<String, String>();
     try {
-      recoveredData = (MutablePair<String, String>)windowManager.load(operatorId, windowId);
+      MutablePair<Integer, Integer> recoveredData = (MutablePair<Integer, Integer>)windowManager.load(operatorId,
+          windowId);
 
-      if (recoveredData != null) {
-        //skip the window and return if there was no incoming data in the window
-        if (recoveredData.left == null || recoveredData.right == null) {
-          return;
-        }
+      if (recoveredData != null && shouldReplayWindow(recoveredData)) {
+        LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", windowId, recoveredData.left,
+            recoveredData.right);
 
-        if (recoveredData.right.equals(rangeQueryPair.getValue()) || recoveredData.right.equals(previousUpperBound)) {
-          LOG.info("Matched so returning");
-          return;
-        }
-
-        JdbcPollInputOperator jdbcPoller = new JdbcPollInputOperator();
-        jdbcPoller.setStore(store);
-        jdbcPoller.setKey(getKey());
-        jdbcPoller.setPartitionCount(getPartitionCount());
-        jdbcPoller.setPollInterval(getPollInterval());
-        jdbcPoller.setTableName(getTableName());
-        jdbcPoller.setBatchSize(getBatchSize());
-        isPollable = false;
-
-        LOG.debug("[Window ID -" + windowId + "," + recoveredData.left + "," + recoveredData.right + "]");
+        ps = store.getConnection().prepareStatement(
+            buildRangeQuery(recoveredData.left, (recoveredData.right - recoveredData.left)), TYPE_FORWARD_ONLY,
+            CONCUR_READ_ONLY);
+        LOG.info("Query formed to recover data - {}", ps.toString());
 
-        jdbcPoller.setRangeQueryPair(new KeyValPair<String, String>(recoveredData.left, recoveredData.right));
+        emitReplayedTuples(ps);
 
-        jdbcPoller.ps = jdbcPoller.store.getConnection().prepareStatement(
-            JdbcMetaDataUtility.buildRangeQuery(jdbcPoller.getTableName(), jdbcPoller.getKey(),
-                jdbcPoller.getRangeQueryPair().getKey(), jdbcPoller.getRangeQueryPair().getValue()),
-            java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
-        LOG.info("Query formed for recovered data - {}", jdbcPoller.ps.toString());
+      }
 
-        emitReplayedTuples(jdbcPoller.ps);
+      if (currentWindowId == windowManager.getLargestRecoveryWindow()) {
+        try {
+          if (!isPollerPartition && rangeQueryPair.getValue() != null) {
+            ps = store.getConnection().prepareStatement(
+                buildRangeQuery(lastEmittedRow, (rangeQueryPair.getValue() - lastEmittedRow)), TYPE_FORWARD_ONLY,
+                CONCUR_READ_ONLY);
+          } else {
+            Integer bound = null;
+            if (lastEmittedRow == null) {
+              bound = rangeQueryPair.getKey();
+            } else {
+              bound = lastEmittedRow;
+            }
+            ps = store.getConnection().prepareStatement(buildRangeQuery(bound, Integer.MAX_VALUE), TYPE_FORWARD_ONLY,
+                CONCUR_READ_ONLY);
+          }
+          scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS);
+        } catch (SQLException e) {
+          throw new RuntimeException(e);
+        }
       }
     } catch (IOException e) {
-      throw new RuntimeException("replay", e);
+      throw new RuntimeException("Exception during replay of records.", e);
     }
 
   }
 
+  private boolean shouldReplayWindow(MutablePair<Integer, Integer> recoveredData)
+  {
+    if (recoveredData.left == null || recoveredData.right == null) {
+      return false;
+    }
+    if (recoveredData.right.equals(rangeQueryPair.getValue()) || recoveredData.right.equals(lastEmittedRow)) {
+      return false;
+    }
+    return true;
+  }
+
   /**
    * Replays the tuples in sync mode for replayed windows
    */
   public void emitReplayedTuples(PreparedStatement ps)
   {
-    LOG.debug("Emitting replayed statement is -" + ps.toString());
     ResultSet rs = null;
     try (PreparedStatement pStat = ps;) {
       pStat.setFetchSize(getFetchSize());
-      LOG.debug("sql query = {}", pStat);
       rs = pStat.executeQuery();
       if (rs == null || rs.isClosed()) {
-        LOG.debug("Nothing to replay");
         return;
       }
       while (rs.next()) {
-        previousUpperBound = rs.getObject(getKey()).toString();
-        outputPort.emit((T)rs.getObject(getKey()));
+        emitTuple(getTuple(rs));
+        lastEmittedRow++;
       }
     } catch (SQLException ex) {
       throw new RuntimeException(ex);
@@ -565,59 +384,42 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
    */
   @Override
   public Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>> definePartitions(
-      Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>> partitions,
-      com.datatorrent.api.Partitioner.PartitioningContext context)
+      Collection<Partition<AbstractJdbcPollInputOperator<T>>> partitions, PartitioningContext context)
   {
     List<Partition<AbstractJdbcPollInputOperator<T>>> newPartitions = new ArrayList<Partition<AbstractJdbcPollInputOperator<T>>>(
         getPartitionCount());
-    JdbcStore jdbcStore = new JdbcStore();
-    jdbcStore.setDatabaseDriver(store.getDatabaseDriver());
-    jdbcStore.setDatabaseUrl(store.getDatabaseUrl());
-    jdbcStore.setConnectionProperties(store.getConnectionProperties());
-
-    jdbcStore.connect();
 
-    HashMap<Integer, KeyValPair<String, String>> partitionToRangeMap = null;
+    HashMap<Integer, KeyValPair<Integer, Integer>> partitionToRangeMap = null;
     try {
-      partitionToRangeMap = JdbcMetaDataUtility.getPartitionedQueryMap(getPartitionCount(),
-          jdbcStore.getDatabaseDriver(), jdbcStore.getDatabaseUrl(), getTableName(), getKey(),
-          store.getConnectionProperties().getProperty(user), store.getConnectionProperties().getProperty(password),
-          whereCondition, emitColumnList);
+      store.connect();
+      intializeDSLContext();
+      partitionToRangeMap = getPartitionedQueryRangeMap(getPartitionCount());
     } catch (SQLException e) {
       LOG.error("Exception in initializing the partition range", e);
+      throw new RuntimeException(e);
+    } finally {
+      store.disconnect();
     }
 
     KryoCloneUtils<AbstractJdbcPollInputOperator<T>> cloneUtils = KryoCloneUtils.createCloneUtils(this);
 
+    // The n given partitions are for range queries and n + 1 partition is for polling query
     for (int i = 0; i <= getPartitionCount(); i++) {
-      AbstractJdbcPollInputOperator<T> jdbcPoller = null;
-
-      jdbcPoller = cloneUtils.getClone();
-
-      jdbcPoller.setStore(store);
-      jdbcPoller.setKey(getKey());
-      jdbcPoller.setPartitionCount(getPartitionCount());
-      jdbcPoller.setPollInterval(getPollInterval());
-      jdbcPoller.setTableName(getTableName());
-      jdbcPoller.setBatchSize(getBatchSize());
-      jdbcPoller.setEmitColumnList(getEmitColumnList());
-
-      store.connect();
-      //The n given partitions are for range queries and n + 1 partition is for polling query
-      //The upper bound for the n+1 partition is set to null since its a pollable partition
+      AbstractJdbcPollInputOperator<T> jdbcPoller = cloneUtils.getClone();
       if (i < getPartitionCount()) {
-        jdbcPoller.setRangeQueryPair(partitionToRangeMap.get(i));
-        isPollable = false;
+        jdbcPoller.rangeQueryPair = partitionToRangeMap.get(i);
+        jdbcPoller.lastEmittedRow = partitionToRangeMap.get(i).getKey();
+        jdbcPoller.isPollerPartition = false;
       } else {
-        jdbcPoller.setRangeQueryPair(new KeyValPair<String, String>(partitionToRangeMap.get(i - 1).getValue(), null));
-        isPollable = true;
+        // The upper bound for the n+1 partition is set to null since its a pollable partition
+        int partitionKey = partitionToRangeMap.get(i - 1).getValue();
+        jdbcPoller.rangeQueryPair = new KeyValPair<Integer, Integer>(partitionKey, null);
+        jdbcPoller.lastEmittedRow = partitionKey;
+        jdbcPoller.isPollerPartition = true;
       }
-      Partition<AbstractJdbcPollInputOperator<T>> po = new DefaultPartition<AbstractJdbcPollInputOperator<T>>(
-          jdbcPoller);
-      newPartitions.add(po);
+      newPartitions.add(new DefaultPartition<AbstractJdbcPollInputOperator<T>>(jdbcPoller));
     }
 
-    previousUpperBound = null;
     return newPartitions;
   }
 
@@ -625,7 +427,69 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
   public void partitioned(
       Map<Integer, com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>> partitions)
   {
-    //Nothing to implement here
+    // Nothing to implement here
+  }
+
+  private HashMap<Integer, KeyValPair<Integer, Integer>> getPartitionedQueryRangeMap(int partitions)
+      throws SQLException
+  {
+    int rowCount = 0;
+    try {
+      rowCount = getRecordsCount();
+    } catch (SQLException e) {
+      LOG.error("Exception in getting the record range", e);
+    }
+
+    HashMap<Integer, KeyValPair<Integer, Integer>> partitionToQueryMap = new HashMap<>();
+    int events = (rowCount / partitions);
+    for (int i = 0, lowerOffset = 0, upperOffset = events; i < partitions - 1; i++, lowerOffset += events, upperOffset += events) {
+      partitionToQueryMap.put(i, new KeyValPair<Integer, Integer>(lowerOffset, upperOffset));
+    }
+
+    partitionToQueryMap.put(partitions - 1, new KeyValPair<Integer, Integer>(events * (partitions - 1), (int)rowCount));
+    LOG.info("Partition map - " + partitionToQueryMap.toString());
+    return partitionToQueryMap;
+  }
+
+  /**
+   * Finds the total number of rows in the table
+   *
+   * @return number of records in table
+   */
+  private int getRecordsCount() throws SQLException
+  {
+    Condition condition = DSL.trueCondition();
+    if (getWhereCondition() != null) {
+      condition = condition.and(getWhereCondition());
+    }
+    int recordsCount = create.select(DSL.count()).from(getTableName()).where(condition).fetchOne(0, int.class);
+    return recordsCount;
+  }
+
+  /**
+   * Helper function returns a range query based on the bounds passed<br>
+   */
+  protected String buildRangeQuery(int offset, int limit)
+  {
+    Condition condition = DSL.trueCondition();
+    if (getWhereCondition() != null) {
+      condition = condition.and(getWhereCondition());
+    }
+
+    String sqlQuery;
+    if (getColumnsExpression() != null) {
+      Collection<Field<?>> columns = new ArrayList<>();
+      for (String column : getColumnsExpression().split(",")) {
+        columns.add(field(column));
+      }
+      sqlQuery = create.select((Collection<? extends SelectField<?>>)columns).from(getTableName()).where(condition)
+          .orderBy(field(getKey())).limit(limit).offset(offset).getSQL(ParamType.INLINED);
+    } else {
+      sqlQuery = create.select().from(getTableName()).where(condition).orderBy(field(getKey())).limit(limit)
+          .offset(offset).getSQL(ParamType.INLINED);
+    }
+    LOG.info("DSL Query: " + sqlQuery);
+    return sqlQuery;
   }
 
   /**
@@ -638,24 +502,219 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
     public void run()
     {
       while (execute) {
-        try {
-          long startTs = System.currentTimeMillis();
-          if ((isPollable && !isPolled) || !isPollable) {
-            pollRecords(ps);
-          }
-          long endTs = System.currentTimeMillis();
-          long ioTime = endTs - startTs;
-          long sleepTime = pollInterval - ioTime;
-          LOG.debug("pollInterval = {} , I/O time = {} , sleepTime = {}", pollInterval, ioTime, sleepTime);
-          Thread.sleep(sleepTime > 0 ? sleepTime : 0);
-        } catch (Exception ex) {
-          cause.set(ex);
-          execute = false;
+        if ((isPollerPartition && !isPolled) || !isPollerPartition) {
+          pollRecords();
         }
       }
     }
   }
 
-  private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(AbstractJdbcPollInputOperator.class);
+  @VisibleForTesting
+  protected void setScheduledExecutorService(ScheduledExecutorService service)
+  {
+    scanService = service;
+  }
+
+  /**
+   * Gets {@link WindowDataManager}
+   *
+   * @return windowDatatManager
+   */
+  public WindowDataManager getWindowManager()
+  {
+    return windowManager;
+  }
+
+  /**
+   * Sets {@link WindowDataManager}
+   *
+   * @param windowDataManager
+   */
+  public void setWindowManager(WindowDataManager windowDataManager)
+  {
+    this.windowManager = windowDataManager;
+  }
+
+  /**
+   * Gets non-polling static partitions count
+   *
+   * @return partitionCount
+   */
+  public int getPartitionCount()
+  {
+    return partitionCount;
+  }
+
+  /**
+   * Sets non-polling static partitions count
+   *
+   * @param partitionCount
+   */
+  public void setPartitionCount(int partitionCount)
+  {
+    this.partitionCount = partitionCount;
+  }
+
+  /**
+   * Returns the where clause
+   *
+   * @return whereCondition
+   */
+  public String getWhereCondition()
+  {
+    return whereCondition;
+  }
+
+  /**
+   * Sets the where clause
+   *
+   * @param whereCondition
+   */
+  public void setWhereCondition(String whereCondition)
+  {
+    this.whereCondition = whereCondition;
+  }
+
+  /**
+   * Returns the list of columns to select from the table
+   *
+   * @return columnsExpression
+   */
+  public String getColumnsExpression()
+  {
+    return columnsExpression;
+  }
+
+  /**
+   * Comma separated list of columns to select from the given table
+   *
+   * @param columnsExpression
+   */
+  public void setColumnsExpression(String columnsExpression)
+  {
+    this.columnsExpression = columnsExpression;
+  }
+
+  /**
+   * Returns the fetchsize for getting the results
+   *
+   * @return fetchSize
+   */
+  public int getFetchSize()
+  {
+    return fetchSize;
+  }
+
+  /**
+   * Sets the fetchsize for getting the results
+   *
+   * @param fetchSize
+   */
+  public void setFetchSize(int fetchSize)
+  {
+    this.fetchSize = fetchSize;
+  }
+
+  /**
+   * Returns the interval for polling the DB
+   *
+   * @return pollInterval
+   */
+  public int getPollInterval()
+  {
+    return pollInterval;
+  }
+
+  /**
+   * Sets the interval for polling the DB
+   *
+   * @param pollInterval
+   */
+  public void setPollInterval(int pollInterval)
+  {
+    this.pollInterval = pollInterval;
+  }
+
+  /**
+   * Returns the capacity of the emit queue
+   *
+   * @return queueCapacity
+   */
+  public int getQueueCapacity()
+  {
+    return queueCapacity;
+  }
+
+  /**
+   * Sets the capacity of the emit queue
+   *
+   * @param queueCapacity
+   */
+  public void setQueueCapacity(int queueCapacity)
+  {
+    this.queueCapacity = queueCapacity;
+  }
+
+  /**
+   * Returns the tableName which would be queried
+   *
+   * @return tableName
+   */
+  public String getTableName()
+  {
+    return tableName;
+  }
+
+  /**
+   * Sets the tableName to query
+   *
+   * @param tableName
+   */
+  public void setTableName(String tableName)
+  {
+    this.tableName = tableName;
+  }
+
+  /**
+   * Returns batchSize indicating the number of elements to emit in a bacth
+   *
+   * @return batchSize
+   */
+  public int getBatchSize()
+  {
+    return batchSize;
+  }
+
+  /**
+   * Sets batchSize for number of elements to emit in a bacth
+   *
+   * @param batchSize
+   */
+  public void setBatchSize(int batchSize)
+  {
+    this.batchSize = batchSize;
+  }
+
+  /**
+   * Sets primary key column name
+   *
+   * @return key
+   */
+  public String getKey()
+  {
+    return key;
+  }
+
+  /**
+   * gets primary key column name
+   *
+   * @param key
+   */
+  public void setKey(String key)
+  {
+    this.key = key;
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcPollInputOperator.class);
 
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/26fa9d78/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java
deleted file mode 100644
index 9ba353c..0000000
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java
+++ /dev/null
@@ -1,353 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.db.jdbc;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.HashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.classification.InterfaceStability.Evolving;
-
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- * A utility class used to retrieve the metadata for a given unique key of a SQL
- * table. This class would emit range queries based on a primary index given
- * 
- * LIMIT clause may not work with all databases or may not return the same
- * results always<br>
- * 
- * This utility has been tested with MySQL and where clause is supported<br>
- * 
- * @Input - dbName,tableName, primaryKey
- * @Output - map<operatorId,prepared statement>
- *
- */
-@Evolving
-public class JdbcMetaDataUtility
-{
-  private static String DB_DRIVER = "com.mysql.jdbc.Driver";
-  private static String DB_CONNECTION = "";
-  private static String DB_USER = "";
-  private static String DB_PASSWORD = "";
-  private static String TABLE_NAME = "";
-  private static String KEY_COLUMN = "";
-  private static String WHERE_CLAUSE = null;
-  private static String COLUMN_LIST = null;
-
-  private static Logger LOG = LoggerFactory.getLogger(JdbcMetaDataUtility.class);
-
-  public JdbcMetaDataUtility()
-  {
-
-  }
-
-  public JdbcMetaDataUtility(String dbConnection, String tableName, String key, String userName, String password)
-  {
-    DB_CONNECTION = dbConnection;
-    DB_USER = userName;
-    DB_PASSWORD = password;
-    TABLE_NAME = tableName;
-    KEY_COLUMN = key;
-  }
-
-  /**
-   * Returns the database connection handle
-   * */
-  private static Connection getDBConnection()
-  {
-
-    Connection dbConnection = null;
-
-    try {
-      Class.forName(DB_DRIVER);
-    } catch (ClassNotFoundException e) {
-      LOG.error("Driver not found", e);
-      throw new RuntimeException(e);
-    }
-
-    try {
-      dbConnection = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
-      return dbConnection;
-    } catch (SQLException e) {
-      LOG.error("Exception in getting connection handle", e);
-      throw new RuntimeException(e);
-    }
-
-  }
-
-  private static String generateQueryString()
-  {
-    StringBuilder sb = new StringBuilder();
-    sb.append("SELECT COUNT(*) as RowCount from " + TABLE_NAME);
-
-    if (WHERE_CLAUSE != null) {
-      sb.append(" WHERE " + WHERE_CLAUSE);
-    }
-
-    return sb.toString();
-  }
-
-  /**
-   * Finds the total number of rows in the table
-   */
-  private static long getRecordRange(String query) throws SQLException
-  {
-    long rowCount = 0;
-    Connection dbConnection = null;
-    PreparedStatement preparedStatement = null;
-
-    try {
-      dbConnection = getDBConnection();
-      preparedStatement = dbConnection.prepareStatement(query);
-
-      ResultSet rs = preparedStatement.executeQuery();
-
-      while (rs.next()) {
-        rowCount = Long.parseLong(rs.getString(1));
-        LOG.debug("# Rows - " + rowCount);
-      }
-
-    } catch (SQLException e) {
-      LOG.error("Exception in retreiving result set", e);
-      throw new RuntimeException(e);
-    } finally {
-      if (preparedStatement != null) {
-        preparedStatement.close();
-      }
-      if (dbConnection != null) {
-        dbConnection.close();
-      }
-    }
-    return rowCount;
-  }
-
-  /**
-   * Returns a pair of <upper,lower> bounds for each partition of the
-   * {@link JdbcPollInputOperator}}
-   */
-  private static KeyValPair<String, String> getQueryBounds(long lower, long upper) throws SQLException
-  {
-    Connection dbConnection = null;
-    PreparedStatement psLowerBound = null;
-    PreparedStatement psUpperBound = null;
-
-    StringBuilder lowerBound = new StringBuilder();
-    StringBuilder upperBound = new StringBuilder();
-
-    KeyValPair<String, String> boundedQUeryPair = null;
-
-    try {
-      dbConnection = getDBConnection();
-
-      /*
-       * Run this loop only for n-1 partitions.
-       * By default the last partition will have fewer records, since we are rounding off
-       * */
-
-      lowerBound.append("SELECT " + KEY_COLUMN + " FROM " + TABLE_NAME);
-      upperBound.append("SELECT " + KEY_COLUMN + " FROM " + TABLE_NAME);
-
-      if (WHERE_CLAUSE != null) {
-        lowerBound.append(" WHERE " + WHERE_CLAUSE);
-        upperBound.append(" WHERE " + WHERE_CLAUSE);
-      }
-
-      lowerBound.append(" LIMIT " + (0 + lower) + ",1");
-      upperBound.append(" LIMIT " + (upper - 1) + ",1");
-
-      psLowerBound = dbConnection.prepareStatement(lowerBound.toString());
-      psUpperBound = dbConnection.prepareStatement(upperBound.toString());
-
-      ResultSet rsLower = psLowerBound.executeQuery();
-      ResultSet rsUpper = psUpperBound.executeQuery();
-
-      String lowerVal = null;
-      String upperVal = null;
-
-      while (rsLower.next()) {
-        lowerVal = rsLower.getString(KEY_COLUMN);
-      }
-
-      while (rsUpper.next()) {
-        upperVal = rsUpper.getString(KEY_COLUMN);
-      }
-
-      boundedQUeryPair = new KeyValPair<String, String>(lowerVal, upperVal);
-
-    } catch (SQLException e) {
-      LOG.error("Exception in getting bounds for queries");
-      throw new RuntimeException(e);
-    } finally {
-      if (psLowerBound != null) {
-        psLowerBound.close();
-      }
-      if (psUpperBound != null) {
-        psUpperBound.close();
-      }
-      if (dbConnection != null) {
-        dbConnection.close();
-      }
-    }
-    return boundedQUeryPair;
-
-  }
-
-  /**
-   * Returns a map of partitionId to a KeyValPair of <lower,upper> of the query
-   * range<br>
-   * Ensures even distribution of records across partitions except the last
-   * partition By default the last partition for batch queries will have fewer
-   * records, since we are rounding off
-   * 
-   * @throws SQLException
-   * 
-   */
-  private static HashMap<Integer, KeyValPair<String, String>> getRangeQueries(int numberOfPartitions, int events,
-      long rowCount) throws SQLException
-  {
-    HashMap<Integer, KeyValPair<String, String>> partitionToQueryMap = new HashMap<Integer, KeyValPair<String, String>>();
-    for (int i = 0, lowerOffset = 0, upperOffset = events; i < numberOfPartitions
-        - 1; i++, lowerOffset += events, upperOffset += events) {
-
-      partitionToQueryMap.put(i, getQueryBounds(lowerOffset, upperOffset));
-    }
-
-    //Call to construct the lower and upper bounds for the last partition
-    partitionToQueryMap.put(numberOfPartitions - 1, getQueryBounds(events * (numberOfPartitions - 1), rowCount));
-
-    LOG.info("Partition map - " + partitionToQueryMap.toString());
-
-    return partitionToQueryMap;
-  }
-
-  /**
-   * Helper function returns a range query based on the bounds passed<br>
-   * Invoked from the setup method of {@link - AbstractJdbcPollInputOperator} to
-   * initialize the preparedStatement in the given operator<br>
-   * Optional whereClause for conditional selections Optional columnList for
-   * projection
-   */
-  public static String buildRangeQuery(String tableName, String keyColumn, String lower, String upper)
-  {
-
-    StringBuilder sb = new StringBuilder();
-    sb.append("SELECT ");
-    if (COLUMN_LIST != null) {
-      sb.append(COLUMN_LIST);
-    } else {
-      sb.append("*");
-    }
-    sb.append(" from " + tableName + " WHERE ");
-    if (WHERE_CLAUSE != null) {
-      sb.append(WHERE_CLAUSE + " AND ");
-    }
-    sb.append(keyColumn + " BETWEEN '" + lower + "' AND '" + upper + "'");
-    return sb.toString();
-  }
-
-  /**
-   * Helper function that constructs a query from the next highest key after an
-   * operator is restarted
-   */
-  public static String buildGTRangeQuery(String tableName, String keyColumn, String lower, String upper)
-  {
-    StringBuilder sb = new StringBuilder();
-    sb.append("SELECT ");
-    if (COLUMN_LIST != null) {
-      sb.append(COLUMN_LIST);
-    } else {
-      sb.append("*");
-    }
-    sb.append(" from " + tableName + " WHERE ");
-    if (WHERE_CLAUSE != null) {
-      sb.append(WHERE_CLAUSE + " AND ");
-    }
-    sb.append(keyColumn + " > '" + lower + "' AND " + keyColumn + " <= '" + upper + "'");
-    return sb.toString();
-  }
-
-  /**
-   * Helper function that constructs a query for polling from outside the given
-   * range
-   */
-  public static String buildPollableQuery(String tableName, String keyColumn, String lower)
-  {
-    StringBuilder sb = new StringBuilder();
-    sb.append("SELECT ");
-    if (COLUMN_LIST != null) {
-      sb.append(COLUMN_LIST);
-    } else {
-      sb.append("*");
-    }
-    sb.append(" from " + tableName + " WHERE ");
-    if (WHERE_CLAUSE != null) {
-      sb.append(WHERE_CLAUSE + " AND ");
-    }
-    sb.append(keyColumn + " > '" + lower + "' ");
-    return sb.toString();
-  }
-
-  /**
-   * Called by the partitioner from {@link - AbstractJdbcPollInputOperator}<br>
-   * Finds the range of query per partition<br>
-   * Returns a map of partitionId to PreparedStatement based on the range
-   * computed
-   * 
-   * @throws SQLException
-   */
-  public static HashMap<Integer, KeyValPair<String, String>> getPartitionedQueryMap(int partitions, String dbDriver,
-      String dbConnection, String tableName, String key, String userName, String password, String whereClause,
-      String columnList) throws SQLException
-  {
-    long rowCount = 0L;
-    try {
-      DB_CONNECTION = dbConnection;
-      DB_USER = userName;
-      DB_PASSWORD = password;
-      TABLE_NAME = tableName;
-      KEY_COLUMN = key;
-      WHERE_CLAUSE = whereClause;
-      COLUMN_LIST = columnList;
-      DB_DRIVER = dbDriver;
-      rowCount = getRecordRange(generateQueryString());
-    } catch (SQLException e) {
-      LOG.error("Exception in getting the record range", e);
-    }
-    return getRangeQueries(partitions, getOffset(rowCount, partitions), rowCount);
-  }
-
-  /**
-   * Returns the rounded offset to arrive at a range query
-   */
-  private static int getOffset(long rowCount, int partitions)
-  {
-    if (rowCount % partitions == 0) {
-      return (int)(rowCount / partitions);
-    } else {
-      return (int)((rowCount - (rowCount % (partitions - 1))) / (partitions - 1));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/26fa9d78/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
new file mode 100644
index 0000000..91821be
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.db.jdbc.JdbcPOJOInputOperator.ActiveFieldInfo;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * A concrete implementation for {@link AbstractJdbcPollInputOperator} to
+ * consume data from jdbc store and emit POJO for each record <br>
+ *
+ * @displayName Jdbc Polling Input Operator
+ * @category Input
+ * @tags database, sql, jdbc
+ */
+@Evolving
+public class JdbcPOJOPollInputOperator extends AbstractJdbcPollInputOperator<Object>
+{
+  private final transient List<ActiveFieldInfo> columnFieldSetters = Lists.newArrayList();
+  protected List<Integer> columnDataTypes;
+  protected transient Class<?> pojoClass;
+  @NotNull
+  private List<FieldInfo> fieldInfos;
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultOutputPort<Object> outputPort = new DefaultOutputPort<Object>()
+  {
+    @Override
+    public void setup(Context.PortContext context)
+    {
+      pojoClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+    }
+  };
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    super.setup(context);
+    try {
+      // closing the query statement in super class as it is not needed
+      if (getColumnsExpression() == null) {
+        StringBuilder columns = new StringBuilder();
+        for (int i = 0; i < fieldInfos.size(); i++) {
+          columns.append(fieldInfos.get(i).getColumnName());
+          if (i < fieldInfos.size() - 1) {
+            columns.append(",");
+          }
+        }
+        setColumnsExpression(columns.toString());
+        LOG.debug("select expr {}", columns.toString());
+      }
+
+      if (columnDataTypes == null) {
+        populateColumnDataTypes();
+      }
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+
+    for (FieldInfo fi : fieldInfos) {
+      columnFieldSetters.add(new ActiveFieldInfo(fi));
+    }
+  }
+
+  @Override
+  public void activate(Context.OperatorContext context)
+  {
+    for (int i = 0; i < columnDataTypes.size(); i++) {
+      final int type = columnDataTypes.get(i);
+      JdbcPOJOInputOperator.ActiveFieldInfo activeFieldInfo = columnFieldSetters.get(i);
+      switch (type) {
+        case (Types.CHAR):
+        case (Types.VARCHAR):
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression(), String.class);
+          break;
+
+        case (Types.BOOLEAN):
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterBoolean(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+
+        case (Types.TINYINT):
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterByte(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+
+        case (Types.SMALLINT):
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterShort(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+
+        case (Types.INTEGER):
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterInt(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+
+        case (Types.BIGINT):
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+
+        case (Types.FLOAT):
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterFloat(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+
+        case (Types.DOUBLE):
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterDouble(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+
+        case Types.DECIMAL:
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression(), BigDecimal.class);
+          break;
+
+        case Types.TIMESTAMP:
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression(), Timestamp.class);
+          break;
+
+        case Types.TIME:
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression(), Time.class);
+          break;
+
+        case Types.DATE:
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression(), Date.class);
+          break;
+
+        default:
+          throw new RuntimeException("unsupported data type " + type);
+      }
+    }
+    super.activate(context);
+  }
+
+  protected void populateColumnDataTypes() throws SQLException
+  {
+    columnDataTypes = Lists.newArrayList();
+    PreparedStatement stmt = store.getConnection().prepareStatement(buildRangeQuery(1, 1));
+    try (ResultSet rs = stmt.executeQuery()) {
+      Map<String, Integer> nameToType = Maps.newHashMap();
+      ResultSetMetaData rsMetaData = rs.getMetaData();
+      LOG.debug("resultSet MetaData column count {}", rsMetaData.getColumnCount());
+
+      for (int i = 1; i <= rsMetaData.getColumnCount(); i++) {
+        int type = rsMetaData.getColumnType(i);
+        String name = rsMetaData.getColumnName(i);
+        LOG.debug("column name {} type {}", name, type);
+        nameToType.put(name, type);
+      }
+
+      for (FieldInfo fieldInfo : fieldInfos) {
+        columnDataTypes.add(nameToType.get(fieldInfo.getColumnName()));
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Object getTuple(ResultSet result)
+  {
+    Object obj;
+    try {
+      obj = pojoClass.newInstance();
+    } catch (InstantiationException | IllegalAccessException ex) {
+      store.disconnect();
+      throw new RuntimeException(ex);
+    }
+
+    try {
+      for (int i = 0; i < fieldInfos.size(); i++) {
+        int type = columnDataTypes.get(i);
+        ActiveFieldInfo afi = columnFieldSetters.get(i);
+        switch (type) {
+          case Types.CHAR:
+          case Types.VARCHAR:
+            String strVal = result.getString(i + 1);
+            ((PojoUtils.Setter<Object, String>)afi.setterOrGetter).set(obj, strVal);
+            break;
+
+          case Types.BOOLEAN:
+            boolean boolVal = result.getBoolean(i + 1);
+            ((PojoUtils.SetterBoolean<Object>)afi.setterOrGetter).set(obj, boolVal);
+            break;
+
+          case Types.TINYINT:
+            byte byteVal = result.getByte(i + 1);
+            ((PojoUtils.SetterByte<Object>)afi.setterOrGetter).set(obj, byteVal);
+            break;
+
+          case Types.SMALLINT:
+            short shortVal = result.getShort(i + 1);
+            ((PojoUtils.SetterShort<Object>)afi.setterOrGetter).set(obj, shortVal);
+            break;
+
+          case Types.INTEGER:
+            int intVal = result.getInt(i + 1);
+            ((PojoUtils.SetterInt<Object>)afi.setterOrGetter).set(obj, intVal);
+            break;
+
+          case Types.BIGINT:
+            long longVal = result.getLong(i + 1);
+            ((PojoUtils.SetterLong<Object>)afi.setterOrGetter).set(obj, longVal);
+            break;
+
+          case Types.FLOAT:
+            float floatVal = result.getFloat(i + 1);
+            ((PojoUtils.SetterFloat<Object>)afi.setterOrGetter).set(obj, floatVal);
+            break;
+
+          case Types.DOUBLE:
+            double doubleVal = result.getDouble(i + 1);
+            ((PojoUtils.SetterDouble<Object>)afi.setterOrGetter).set(obj, doubleVal);
+            break;
+
+          case Types.DECIMAL:
+            BigDecimal bdVal = result.getBigDecimal(i + 1);
+            ((PojoUtils.Setter<Object, BigDecimal>)afi.setterOrGetter).set(obj, bdVal);
+            break;
+
+          case Types.TIMESTAMP:
+            Timestamp tsVal = result.getTimestamp(i + 1);
+            ((PojoUtils.Setter<Object, Timestamp>)afi.setterOrGetter).set(obj, tsVal);
+            break;
+
+          case Types.TIME:
+            Time timeVal = result.getTime(i + 1);
+            ((PojoUtils.Setter<Object, Time>)afi.setterOrGetter).set(obj, timeVal);
+            break;
+
+          case Types.DATE:
+            Date dateVal = result.getDate(i + 1);
+            ((PojoUtils.Setter<Object, Date>)afi.setterOrGetter).set(obj, dateVal);
+            break;
+
+          default:
+            throw new RuntimeException("unsupported data type " + type);
+        }
+      }
+      return obj;
+    } catch (SQLException e) {
+      store.disconnect();
+      throw new RuntimeException("fetching metadata", e);
+    }
+  }
+
+  @Override
+  protected void emitTuple(Object obj)
+  {
+    outputPort.emit(obj);
+  }
+
+  /**
+   * A list of {@link FieldInfo}s where each item maps a column name to a pojo
+   * field name.
+   */
+  public List<FieldInfo> getFieldInfos()
+  {
+    return fieldInfos;
+  }
+
+  /**
+   * Sets the {@link FieldInfo}s. A {@link FieldInfo} maps a store column to a
+   * pojo field name.<br/>
+   * The value from fieldInfo.column is assigned to
+   * fieldInfo.pojoFieldExpression.
+   *
+   * @description $[].columnName name of the database column name
+   * @description $[].pojoFieldExpression pojo field name or expression
+   * @useSchema $[].pojoFieldExpression outputPort.fields[].name
+   */
+  public void setFieldInfos(List<FieldInfo> fieldInfos)
+  {
+    this.fieldInfos = fieldInfos;
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(JdbcPOJOPollInputOperator.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/26fa9d78/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java
index 45f96bf..518ac17 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java
@@ -18,214 +18,66 @@
  */
 package com.datatorrent.lib.db.jdbc;
 
-import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 
 import com.google.common.collect.Lists;
 
 import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 
 /**
- * A concrete implementation for {@link AbstractJdbcPollInputOperator}} for
- * consuming data from MySQL using JDBC interface <br>
- * User needs to provide tableName,dbConnection,setEmitColumnList,look-up key
- * <br>
- * Optionally batchSize,pollInterval,Look-up key and a where clause can be given
- * <br>
- * This operator uses static partitioning to arrive at range queries for exactly
- * once reads<br>
- * Assumption is that there is an ordered column using which range queries can
- * be formed<br>
- * If an emitColumnList is provided, please ensure that the keyColumn is the
- * first column in the list<br>
- * Range queries are formed using the {@link JdbcMetaDataUtility}} Output -
- * comma separated list of the emit columns eg columnA,columnB,columnC
+ * A concrete implementation for {@link AbstractJdbcPollInputOperator} to
+ * consume data from jdbc store and emit comma separated values <br>
  * 
  * @displayName Jdbc Polling Input Operator
  * @category Input
  * @tags database, sql, jdbc
  */
 @Evolving
-@OperatorAnnotation(checkpointableWithinAppWindow = false)
-public class JdbcPollInputOperator extends AbstractJdbcPollInputOperator<Object>
+public class JdbcPollInputOperator extends AbstractJdbcPollInputOperator<String>
 {
-  private long lastBatchWindowId;
-  private transient long currentWindowId;
-  private long lastCreationTsMillis;
-  private long fetchBackMillis = 0L;
+  @OutputPortFieldAnnotation
+  public final transient DefaultOutputPort<String> outputPort = new DefaultOutputPort<>();
   private ArrayList<String> emitColumns;
-  private transient int count = 0;
-
-  /**
-   * Returns the emit columns
-   */
-  public List<String> getEmitColumns()
-  {
-    return emitColumns;
-  }
-
-  /**
-   * Sets the emit columns
-   */
-  public void setEmitColumns(ArrayList<String> emitColumns)
-  {
-    this.emitColumns = emitColumns;
-  }
-
-  /**
-   * Returns fetchBackMilis
-   */
-  public long getFetchBackMillis()
-  {
-    return fetchBackMillis;
-  }
-
-  /**
-   * Sets fetchBackMilis - used in polling
-   */
-  public void setFetchBackMillis(long fetchBackMillis)
-  {
-    this.fetchBackMillis = fetchBackMillis;
-  }
 
   @Override
   public void setup(OperatorContext context)
   {
     super.setup(context);
-    parseEmitColumnList(getEmitColumnList());
-    lastCreationTsMillis = System.currentTimeMillis() - fetchBackMillis;
+    parseEmitColumnList();
   }
 
-  private void parseEmitColumnList(String columnList)
+  private void parseEmitColumnList()
   {
-    String[] cols = columnList.split(",");
-    ArrayList<String> arr = Lists.newArrayList();
+    String[] cols = getColumnsExpression().split(",");
+    emitColumns = Lists.newArrayList();
     for (int i = 0; i < cols.length; i++) {
-      arr.add(cols[i]);
+      emitColumns.add(cols[i]);
     }
-    setEmitColumns(arr);
-  }
-
-  @Override
-  public void beginWindow(long l)
-  {
-    super.beginWindow(l);
-    currentWindowId = l;
   }
 
   @Override
-  protected void pollRecords(PreparedStatement ps)
+  public String getTuple(ResultSet rs)
   {
-    ResultSet rs = null;
-    List<Object> metaList = new ArrayList<>();
-
-    if (isReplayed) {
-      return;
-    }
-
+    StringBuilder resultTuple = new StringBuilder();
     try {
-      if (ps.isClosed()) {
-        LOG.debug("Returning due to closed ps for non-pollable partitions");
-        return;
+      for (String obj : emitColumns) {
+        resultTuple.append(rs.getObject(obj) + ",");
       }
+      return resultTuple.substring(0, resultTuple.length() - 1); //remove last comma
     } catch (SQLException e) {
-      LOG.error("Prepared statement is closed", e);
       throw new RuntimeException(e);
     }
-
-    try (PreparedStatement pStat = ps;) {
-      pStat.setFetchSize(getFetchSize());
-      LOG.debug("sql query = {}", pStat);
-      rs = pStat.executeQuery();
-      boolean hasNext = false;
-
-      if (rs == null || rs.isClosed()) {
-        return;
-      }
-
-      while ((hasNext = rs.next())) {
-        Object key = null;
-        StringBuilder resultTuple = new StringBuilder();
-        try {
-          if (count < getBatchSize()) {
-            key = rs.getObject(getKey());
-            for (String obj : emitColumns) {
-              resultTuple.append(rs.getObject(obj) + ",");
-            }
-            metaList.add(resultTuple.substring(0, resultTuple.length() - 1));
-            count++;
-          } else {
-            emitQueue.add(metaList);
-            metaList = new ArrayList<>();
-            key = rs.getObject(getKey());
-            for (String obj : emitColumns) {
-              resultTuple.append(rs.getObject(obj) + ",");
-            }
-            metaList.add(resultTuple.substring(0, resultTuple.length() - 1));
-            count = 0;
-          }
-        } catch (NullPointerException npe) {
-          LOG.error("Key not found" + npe);
-          throw new RuntimeException(npe);
-        }
-        if (isPollable) {
-          highestPolled = key.toString();
-          isPolled = true;
-        }
-      }
-      /*Flush the remaining records once the result set is over and batch-size is not reached,
-       * Dont flush if its pollable*/
-      if (!hasNext) {
-        if ((isPollable && isPolled) || !isPollable) {
-          emitQueue.offer(metaList);
-          metaList = new ArrayList<>();
-          count = 0;
-        }
-        if (!isPolled) {
-          isPolled = true;
-        }
-      }
-      LOG.debug("last creation time stamp = {}", lastCreationTsMillis);
-    } catch (SQLException ex) {
-      throw new RuntimeException(ex);
-    }
   }
 
   @Override
-  public void emitTuples()
+  protected void emitTuple(String tuple)
   {
-    if (isReplayed) {
-      LOG.debug(
-          "Calling emit tuples during window - " + currentWindowId + "::" + windowManager.getLargestRecoveryWindow());
-      LOG.debug("Returning for replayed window");
-      return;
-    }
-
-    List<Object> tuples;
-
-    if (lastBatchWindowId < currentWindowId) {
-      if ((tuples = emitQueue.poll()) != null) {
-        for (Object tuple : tuples) {
-          String[] str = tuple.toString().split(",");
-          if (lower == null) {
-            lower = str[0];
-          }
-          upper = str[0];
-          outputPort.emit(tuple);
-        }
-        lastBatchWindowId = currentWindowId;
-      }
-    }
+    outputPort.emit(tuple);
   }
-
-  private static final Logger LOG = LoggerFactory.getLogger(JdbcPollInputOperator.class);
 }