You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/08/29 05:33:31 UTC

[apex-malhar] branch master updated: APEXMALHAR-2514 JDBC poll offset rebase support.

This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git


The following commit(s) were added to refs/heads/master by this push:
     new cae29f8  APEXMALHAR-2514 JDBC poll offset rebase support.
cae29f8 is described below

commit cae29f87a8819eb35d9ad917d6101283a56544d7
Author: Thomas Weise <th...@apache.org>
AuthorDate: Thu Jul 27 11:16:09 2017 -0700

    APEXMALHAR-2514 JDBC poll offset rebase support.
---
 .../lib/db/jdbc/AbstractJdbcPollInputOperator.java | 279 +++++++++++++--------
 .../lib/db/jdbc/JdbcPOJOPollInputOperator.java     |   2 +-
 .../lib/db/jdbc/JdbcPojoPollableOpeartorTest.java  |  80 +++++-
 3 files changed, 259 insertions(+), 102 deletions(-)

diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
index af8f77f..2042ac8 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
@@ -24,14 +24,15 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.validation.constraints.Min;
 import javax.validation.constraints.NotNull;
@@ -85,7 +86,7 @@ import static org.jooq.impl.DSL.field;
  * The operator uses jOOQ to build the SQL queries based on the discovered {@link org.jooq.SQLDialect}.
  * Note that some of the dialects (including Oracle) are only available in commercial
  * jOOQ distributions. If the dialect is not available, a generic translation is applied,
- * you can post-process the generated SQL by overriding {@link #buildRangeQuery(int, int)}.
+ * you can post-process the generated SQL by overriding {@link #buildRangeQuery(Object, int, int)}.
  *
  * @displayName Jdbc Polling Input Operator
  * @category Input
@@ -98,13 +99,14 @@ import static org.jooq.impl.DSL.field;
 public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInputOperator<T, JdbcStore> implements
     ActivationListener<OperatorContext>, Partitioner<AbstractJdbcPollInputOperator<T>>
 {
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcPollInputOperator.class);
   private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024;
   private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
   private static int DEFAULT_FETCH_SIZE = 20000;
   private static int DEFAULT_BATCH_SIZE = 2000;
   private static int DEFAULT_SLEEP_TIME = 100;
   private static int DEFAULT_RESULT_LIMIT = 100000;
-  private int pollInterval = DEFAULT_POLL_INTERVAL; //in miliseconds
+  private int pollInterval = DEFAULT_POLL_INTERVAL; //in milliseconds
   private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
   private int fetchSize = DEFAULT_FETCH_SIZE;
   /**
@@ -112,8 +114,8 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
    */
   private int resultLimit = DEFAULT_RESULT_LIMIT;
 
-  @Min(1)
-  private int partitionCount = 1;
+  @Min(0)
+  private int partitionCount = 0;
   private int batchSize = DEFAULT_BATCH_SIZE;
 
   @NotNull
@@ -124,25 +126,37 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
   private String whereCondition = null;
   private long currentWindowId;
   private WindowDataManager windowManager;
+  protected WindowData currentWindowRecoveryState;
+  private boolean rebaseOffset;
 
   protected KeyValPair<Integer, Integer> rangeQueryPair;
-  protected Integer lowerBound;
   protected Integer lastEmittedRow;
   protected transient DSLContext dslContext;
   private transient volatile boolean execute;
   private transient ScheduledExecutorService scanService;
   private transient ScheduledFuture<?> pollFuture;
-  protected transient LinkedBlockingDeque<T> emitQueue;
+  protected transient LinkedBlockingQueue<T> emitQueue;
   protected transient PreparedStatement ps;
   protected boolean isPollerPartition;
 
-  protected transient MutablePair<Integer, Integer> currentWindowRecoveryState;
-
   private transient int lastOffset;
+  private transient Object prevKey;
+  private transient Object lastKey;
+
+  /**
+   * The candidate key/offset pair identified by the poller thread
+   * that, once emitted, can be used to rebase the lower bound for subsequent queries.
+   */
+  private transient AtomicReference<MutablePair<Object, Integer>> fetchedKeyAndOffset = new AtomicReference<>();
+
+  /**
+   * Signal to the fetch thread to rebase query.
+   */
+  private transient AtomicBoolean adjustKeyAndOffset = new AtomicBoolean();
 
   public AbstractJdbcPollInputOperator()
   {
-    currentWindowRecoveryState = new MutablePair<>();
+    currentWindowRecoveryState = new WindowData();
     windowManager = new FSWindowDataManager();
   }
 
@@ -155,7 +169,7 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
       scanService = Executors.newScheduledThreadPool(1);
     }
     execute = true;
-    emitQueue = new LinkedBlockingDeque<>(queueCapacity);
+    emitQueue = new LinkedBlockingQueue<>(queueCapacity);
     windowManager.setup(context);
   }
 
@@ -167,10 +181,10 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
   @Override
   public void activate(OperatorContext context)
   {
-    initializePreparedStatement();
     long largestRecoveryWindow = windowManager.getLargestCompletedWindow();
     if (largestRecoveryWindow == Stateless.WINDOW_ID
         || context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) > largestRecoveryWindow) {
+      initializePreparedStatement();
       schedulePollTask();
     }
   }
@@ -197,16 +211,18 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
   protected void initializePreparedStatement()
   {
     try {
-      // If its a range query pass upper and lower bounds, If its a polling query pass only the lower bound
-      if (isPollerPartition) {
+      if (currentWindowRecoveryState.lowerBound == 0 && currentWindowRecoveryState.key == null) {
         lastOffset = rangeQueryPair.getKey();
       } else {
+        lastOffset = currentWindowRecoveryState.lowerBound;
+        lastKey = currentWindowRecoveryState.key;
+      }
+      if (!isPollerPartition) {
         ps = store.getConnection().prepareStatement(
-            buildRangeQuery(rangeQueryPair.getKey(), (rangeQueryPair.getValue() - rangeQueryPair.getKey())),
+            buildRangeQuery(null, lastOffset, (rangeQueryPair.getValue() - lastOffset)),
             TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
       }
     } catch (SQLException e) {
-      LOG.error("Exception in initializing the range query for a given partition", e);
       throw new RuntimeException(e);
     }
 
@@ -224,7 +240,20 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
         throw new RuntimeException("Replay failed", e);
       }
     }
-    lowerBound = lastEmittedRow;
+
+    currentWindowRecoveryState = WindowData.of(currentWindowRecoveryState.key, lastEmittedRow, 0);
+    if (isPollerPartition) {
+      MutablePair<Object, Integer> keyOffset = fetchedKeyAndOffset.get();
+      if (keyOffset != null && keyOffset.getRight() < lastEmittedRow) {
+        if (!adjustKeyAndOffset.get()) {
+          // rebase offset
+          lastEmittedRow -= keyOffset.getRight();
+          currentWindowRecoveryState.lowerBound = lastEmittedRow;
+          currentWindowRecoveryState.key = keyOffset.getLeft();
+          adjustKeyAndOffset.set(true);
+        }
+      }
+    }
   }
 
   @Override
@@ -246,6 +275,26 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
 
   protected abstract void emitTuple(T tuple);
 
+  /**
+   * Visible to subclasses to allow for custom offset saving and initialization.
+   */
+  protected static class WindowData
+  {
+    // members visible for access in subclasses of poll operator
+    public Object key;
+    public int lowerBound;
+    public int upperBound;
+
+    public static WindowData of(Object key, int lowerBound, int upperBound)
+    {
+      WindowData wd = new WindowData();
+      wd.key = key;
+      wd.lowerBound = lowerBound;
+      wd.upperBound = upperBound;
+      return wd;
+    }
+  }
+
   @Override
   public void endWindow()
   {
@@ -266,7 +315,7 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
 
     try {
       if (currentWindowId > windowManager.getLargestCompletedWindow()) {
-        currentWindowRecoveryState = new MutablePair<>(lowerBound, lastEmittedRow);
+        currentWindowRecoveryState.upperBound = lastEmittedRow;
         windowManager.save(currentWindowRecoveryState, currentWindowId);
       }
     } catch (IOException e) {
@@ -282,23 +331,45 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
     store.disconnect();
   }
 
+  protected Object extractKey(ResultSet rs) throws SQLException
+  {
+    return rs.getObject(this.key);
+  }
+
   /**
    * Execute the query and transfer results to the emit queue.
    * @param preparedStatement PreparedStatement to execute the query and fetch results.
    */
-  protected void insertDbDataInQueue(PreparedStatement preparedStatement) throws SQLException, InterruptedException
+  protected int insertDbDataInQueue(PreparedStatement preparedStatement) throws SQLException, InterruptedException
   {
+    int resultCount = 0;
     preparedStatement.setFetchSize(getFetchSize());
     ResultSet result = preparedStatement.executeQuery();
-    if (result.next()) {
-      do {
-        while (execute && !emitQueue.offer(getTuple(result))) {
-          Thread.sleep(DEFAULT_SLEEP_TIME);
+    while (execute && result.next()) {
+      T obj = getTuple(result);
+      if (obj == null) {
+        continue;
+      }
+      while (execute && !emitQueue.offer(obj)) {
+        Thread.sleep(DEFAULT_SLEEP_TIME);
+      }
+      if (isPollerPartition && rebaseOffset) {
+        if (prevKey == null) {
+          prevKey = extractKey(result);
+        } else if (this.fetchedKeyAndOffset.get() == null) {
+          // track key change
+          Object nextKey = extractKey(result);
+          if (!nextKey.equals(prevKey)) {
+            // new key, ready for rebase (WHERE key > ?)
+            fetchedKeyAndOffset.set(new MutablePair<>(prevKey, lastOffset + resultCount));
+          }
         }
-      } while (execute && result.next());
-      result.close();
+      }
+      resultCount++;
     }
+    result.close();
     preparedStatement.close();
+    return resultCount;
   }
 
   /**
@@ -308,15 +379,21 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
   {
     try {
       if (isPollerPartition) {
-        LOG.debug("poll query");
-        int nextOffset = getRecordsCount();
-        while (lastOffset < nextOffset) {
-          PreparedStatement preparedStatement = store.getConnection().prepareStatement(buildRangeQuery(lastOffset, resultLimit),
+        if (adjustKeyAndOffset.get()) {
+          LOG.debug("lastOffset {} lastKey {} rebase {}", lastOffset, lastKey, fetchedKeyAndOffset.get());
+          lastOffset -= fetchedKeyAndOffset.get().getRight();
+          lastKey = fetchedKeyAndOffset.get().getLeft();
+          prevKey = null;
+          fetchedKeyAndOffset.set(null);
+          adjustKeyAndOffset.set(false);
+        }
+        int count = getRecordsCount(lastKey);
+        LOG.debug("Poll count {}", count);
+        while (lastOffset < count) {
+          PreparedStatement preparedStatement = store.getConnection().prepareStatement(buildRangeQuery(lastKey, lastOffset, resultLimit),
               TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
-          insertDbDataInQueue(preparedStatement);
-          lastOffset = lastOffset + resultLimit;
+          lastOffset += insertDbDataInQueue(preparedStatement);
         }
-        lastOffset = nextOffset;
       } else {
         insertDbDataInQueue(ps);
       }
@@ -335,42 +412,21 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
   protected void replay(long windowId) throws SQLException
   {
     try {
-      @SuppressWarnings("unchecked")
-      MutablePair<Integer, Integer> recoveredData = (MutablePair<Integer, Integer>)windowManager.retrieve(windowId);
-
-      if (recoveredData != null && shouldReplayWindow(recoveredData)) {
-        LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", windowId, recoveredData.left,
-            recoveredData.right);
+      WindowData wd = (WindowData)windowManager.retrieve(windowId);
+      if (wd != null && wd.upperBound - wd.lowerBound > 0) {
+        LOG.debug("[Recovering Window ID - {} for key: {} record range: {}, {}]", windowId, wd.key,
+            wd.lowerBound, wd.upperBound);
         ps = store.getConnection().prepareStatement(
-            buildRangeQuery(recoveredData.left, (recoveredData.right - recoveredData.left)), TYPE_FORWARD_ONLY,
+            buildRangeQuery(wd.key, wd.lowerBound, (wd.upperBound - wd.lowerBound)), TYPE_FORWARD_ONLY,
             CONCUR_READ_ONLY);
         LOG.info("Query formed to recover data - {}", ps.toString());
         emitReplayedTuples(ps);
       }
 
       if (currentWindowId == windowManager.getLargestCompletedWindow()) {
-        try {
-          if (!isPollerPartition && rangeQueryPair.getValue() != null) {
-            ps = store.getConnection().prepareStatement(
-                buildRangeQuery(lastEmittedRow, (rangeQueryPair.getValue() - lastEmittedRow)), TYPE_FORWARD_ONLY,
-                CONCUR_READ_ONLY);
-          } else {
-            Integer bound = null;
-            if (lastEmittedRow == null) {
-              bound = rangeQueryPair.getKey();
-            } else {
-              bound = lastEmittedRow;
-            }
-            ps = store.getConnection().prepareStatement(buildRangeQuery(bound, Integer.MAX_VALUE), TYPE_FORWARD_ONLY,
-                CONCUR_READ_ONLY);
-            if (isPollerPartition) {
-              lastOffset = bound;
-            }
-          }
-          schedulePollTask();
-        } catch (SQLException e) {
-          throw new RuntimeException(e);
-        }
+        currentWindowRecoveryState = WindowData.of(wd.key, wd.upperBound, wd.upperBound);
+        initializePreparedStatement();
+        schedulePollTask();
       }
     } catch (IOException e) {
       throw new RuntimeException("Exception during replay of records.", e);
@@ -378,17 +434,6 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
 
   }
 
-  private boolean shouldReplayWindow(MutablePair<Integer, Integer> recoveredData)
-  {
-    if (recoveredData.left == null || recoveredData.right == null) {
-      return false;
-    }
-    if (recoveredData.right.equals(rangeQueryPair.getValue()) || recoveredData.right.equals(lastEmittedRow)) {
-      return false;
-    }
-    return true;
-  }
-
   /**
    * Replays the tuples in sync mode for replayed windows
    */
@@ -423,11 +468,11 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
     List<Partition<AbstractJdbcPollInputOperator<T>>> newPartitions = new ArrayList<>(
         getPartitionCount());
 
-    HashMap<Integer, KeyValPair<Integer, Integer>> partitionToRangeMap = null;
+    final List<KeyValPair<Integer, Integer>> partitionRanges;
     try {
       store.connect();
       dslContext = createDSLContext();
-      partitionToRangeMap = getPartitionedQueryRangeMap(getPartitionCount());
+      partitionRanges = getPartitionedQueryRanges(getPartitionCount());
     } catch (SQLException e) {
       LOG.error("Exception in initializing the partition range", e);
       throw new RuntimeException(e);
@@ -436,24 +481,25 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
     }
 
     KryoCloneUtils<AbstractJdbcPollInputOperator<T>> cloneUtils = KryoCloneUtils.createCloneUtils(this);
+    int pollOffset = 0;
 
     // The n given partitions are for range queries and n + 1 partition is for polling query
-    for (int i = 0; i <= getPartitionCount(); i++) {
+    for (KeyValPair<Integer, Integer> range : partitionRanges) {
       AbstractJdbcPollInputOperator<T> jdbcPoller = cloneUtils.getClone();
-      if (i < getPartitionCount()) {
-        jdbcPoller.rangeQueryPair = partitionToRangeMap.get(i);
-        jdbcPoller.lastEmittedRow = partitionToRangeMap.get(i).getKey();
-        jdbcPoller.isPollerPartition = false;
-      } else {
-        // The upper bound for the n+1 partition is set to null since its a pollable partition
-        int partitionKey = partitionToRangeMap.get(i - 1).getValue();
-        jdbcPoller.rangeQueryPair = new KeyValPair<>(partitionKey, null);
-        jdbcPoller.lastEmittedRow = partitionKey;
-        jdbcPoller.isPollerPartition = true;
-      }
+      jdbcPoller.rangeQueryPair = range;
+      jdbcPoller.lastEmittedRow = range.getKey();
+      jdbcPoller.isPollerPartition = false;
       newPartitions.add(new DefaultPartition<>(jdbcPoller));
+      pollOffset = range.getValue();
     }
 
+    // The upper bound for the n+1 partition is set to null since its a pollable partition
+    AbstractJdbcPollInputOperator<T> jdbcPoller = cloneUtils.getClone();
+    jdbcPoller.rangeQueryPair = new KeyValPair<>(pollOffset, null);
+    jdbcPoller.lastEmittedRow = pollOffset;
+    jdbcPoller.isPollerPartition = true;
+    newPartitions.add(new DefaultPartition<>(jdbcPoller));
+
     return newPartitions;
   }
 
@@ -464,25 +510,34 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
     // Nothing to implement here
   }
 
-  private HashMap<Integer, KeyValPair<Integer, Integer>> getPartitionedQueryRangeMap(int partitions)
+  private List<KeyValPair<Integer, Integer>> getPartitionedQueryRanges(int partitions)
       throws SQLException
   {
+    if (partitions == 0) {
+      return new ArrayList<>(0);
+    }
+
     int rowCount = 0;
     try {
-      rowCount = getRecordsCount();
+      rowCount = getRecordsCount(null);
     } catch (SQLException e) {
       LOG.error("Exception in getting the record range", e);
     }
 
-    HashMap<Integer, KeyValPair<Integer, Integer>> partitionToQueryMap = new HashMap<>();
+    List<KeyValPair<Integer, Integer>> partitionToQueryList = new ArrayList<>();
     int events = (rowCount / partitions);
     for (int i = 0, lowerOffset = 0, upperOffset = events; i < partitions - 1; i++, lowerOffset += events, upperOffset += events) {
-      partitionToQueryMap.put(i, new KeyValPair<>(lowerOffset, upperOffset));
+      partitionToQueryList.add(new KeyValPair<>(lowerOffset, upperOffset));
     }
 
-    partitionToQueryMap.put(partitions - 1, new KeyValPair<>(events * (partitions - 1), rowCount));
-    LOG.info("Partition map - " + partitionToQueryMap.toString());
-    return partitionToQueryMap;
+    partitionToQueryList.add(new KeyValPair<>(events * (partitions - 1), rowCount));
+    LOG.info("Partition ranges - " + partitionToQueryList.toString());
+    return partitionToQueryList;
+  }
+
+  protected Condition andLowerBoundKeyCondition(Condition c, Object lowerBoundKey)
+  {
+    return c.and(this.key + " > ?", lowerBoundKey);
   }
 
   /**
@@ -490,12 +545,17 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
    *
    * @return number of records in table
    */
-  private int getRecordsCount() throws SQLException
+  private int getRecordsCount(Object lowerBoundKey) throws SQLException
   {
     Condition condition = DSL.trueCondition();
     if (getWhereCondition() != null) {
       condition = condition.and(getWhereCondition());
     }
+
+    if (isPollerPartition && lowerBoundKey != null) {
+      condition = andLowerBoundKeyCondition(condition, lowerBoundKey);
+    }
+
     int recordsCount = dslContext.select(DSL.count()).from(getTableName()).where(condition).fetchOne(0, int.class);
     return recordsCount;
   }
@@ -503,13 +563,17 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
   /**
    * Helper function returns a range query based on the bounds passed<br>
    */
-  protected String buildRangeQuery(int offset, int limit)
+  protected String buildRangeQuery(Object lowerBoundKey, int offset, int limit)
   {
     Condition condition = DSL.trueCondition();
     if (getWhereCondition() != null) {
       condition = condition.and(getWhereCondition());
     }
 
+    if (isPollerPartition && lowerBoundKey != null) {
+      condition = andLowerBoundKeyCondition(condition, lowerBoundKey);
+    }
+
     String sqlQuery;
     if (getColumnsExpression() != null) {
       Collection<Field<?>> columns = new ArrayList<>();
@@ -563,7 +627,8 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
   }
 
   /**
-   * Sets non-polling static partitions count
+   * Sets non-polling static partitions count.<p>
+   * When set to 0, the operator will run in poll mode only.
    *
    * @param partitionCount
    */
@@ -750,6 +815,24 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
     this.resultLimit = resultLimit;
   }
 
-  private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcPollInputOperator.class);
+  public boolean isRebaseOffset()
+  {
+    return rebaseOffset;
+  }
+
+  /**
+   * Whether the query should automatically be augmented with a WHERE
+   * condition for trailing lower bound key value.
+   * <p>
+   * Rebase allows the operator to poll from tables where old data is
+   * periodically purged. Without it, the default zero based row offset would
+   * lead to missed data. The trailing floor is also more efficient when working
+   * with key partitioned sources as the query planner can skip those partitions
+   * below the base key.
+   */
+  public void setRebaseOffset(boolean rebaseOffset)
+  {
+    this.rebaseOffset = rebaseOffset;
+  }
 
 }
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
index 897fe10..9f2c3e3 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
@@ -186,7 +186,7 @@ public class JdbcPOJOPollInputOperator extends AbstractJdbcPollInputOperator<Obj
   protected void populateColumnDataTypes() throws SQLException
   {
     columnDataTypes = Lists.newArrayList();
-    try (PreparedStatement stmt = store.getConnection().prepareStatement(buildRangeQuery(1, 1))) {
+    try (PreparedStatement stmt = store.getConnection().prepareStatement(buildRangeQuery(null, 1, 1))) {
       Map<String, Integer> nameToType = Maps.newHashMap();
       ResultSetMetaData rsMetaData = stmt.getMetaData();
       LOG.debug("resultSet MetaData column count {}", rsMetaData.getColumnCount());
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
index 1f8d7db..972dacb 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
@@ -38,7 +38,6 @@ import org.mockito.MockitoAnnotations;
 
 import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.tuple.MutablePair;
 
 import com.google.common.collect.Lists;
 
@@ -47,6 +46,7 @@ import com.datatorrent.api.Context;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.Partitioner;
+import com.datatorrent.lib.db.jdbc.AbstractJdbcPollInputOperator.WindowData;
 import com.datatorrent.lib.helper.TestPortContext;
 import com.datatorrent.lib.testbench.CollectorTestSink;
 import com.datatorrent.lib.util.FieldInfo;
@@ -70,9 +70,10 @@ public class JdbcPojoPollableOpeartorTest extends JdbcOperatorTest
   private WindowDataManager windowDataManagerMock;
 
   @Before
-  public void beforeTest()
+  public void beforeTest() throws IOException
   {
     dir = "target/" + APP_ID + "/";
+    FileUtils.deleteDirectory(new File(dir));
 
     MockitoAnnotations.initMocks(this);
     when(mockscheduler.scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)))
@@ -186,7 +187,7 @@ public class JdbcPojoPollableOpeartorTest extends JdbcOperatorTest
   {
     int operatorId = 1;
     when(windowDataManagerMock.getLargestCompletedWindow()).thenReturn(1L);
-    when(windowDataManagerMock.retrieve(1)).thenReturn(new MutablePair<>(0, 4));
+    when(windowDataManagerMock.retrieve(1)).thenReturn(WindowData.of(null, 0, 4));
 
     insertEvents(10, true, 0);
 
@@ -337,4 +338,77 @@ public class JdbcPojoPollableOpeartorTest extends JdbcOperatorTest
     return fieldInfos;
   }
 
+  @Test
+  public void testPollWithOffsetRebase() throws Exception
+  {
+    insertEvents(0, true, 0); // clear table
+
+    JdbcStore store = new JdbcStore();
+    store.setDatabaseDriver(DB_DRIVER);
+    store.setDatabaseUrl(URL);
+
+    List<FieldInfo> fieldInfos = getFieldInfos();
+
+    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+    TestPortContext tpc = new TestPortContext(portAttributes);
+
+    JdbcPOJOPollInputOperator inputOperator = new JdbcPOJOPollInputOperator();
+    inputOperator.setStore(store);
+    inputOperator.setTableName(TABLE_POJO_NAME);
+    inputOperator.setColumnsExpression("ID,STARTDATE,STARTTIME,STARTTIMESTAMP");
+    inputOperator.setKey("id");
+    inputOperator.setFieldInfos(fieldInfos);
+    inputOperator.setFetchSize(100);
+    inputOperator.setBatchSize(100);
+    inputOperator.setPartitionCount(1);
+    inputOperator.setRebaseOffset(true);
+
+    Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>> newPartitions = inputOperator
+        .definePartitions(new ArrayList<Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>>(), null);
+
+    int operatorId = 0;
+    for (com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>> partition : newPartitions) {
+
+      Attribute.AttributeMap.DefaultAttributeMap partitionAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+      partitionAttributeMap.put(DAG.APPLICATION_ID, APP_ID);
+      partitionAttributeMap.put(Context.DAGContext.APPLICATION_PATH, dir);
+
+      OperatorContext partitioningContext = mockOperatorContext(operatorId++, partitionAttributeMap);
+
+      JdbcPOJOPollInputOperator parition = (JdbcPOJOPollInputOperator)partition.getPartitionedInstance();
+      parition.outputPort.setup(tpc);
+      parition.setScheduledExecutorService(mockscheduler);
+      parition.setup(partitioningContext);
+      parition.activate(partitioningContext);
+    }
+
+    Iterator<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>> itr = newPartitions
+        .iterator();
+    // First partition is for range queries,last is for polling queries
+    JdbcPOJOPollInputOperator firstInstance = (JdbcPOJOPollInputOperator)itr.next().getPartitionedInstance();
+
+    int rows = 0;
+    int windowId = 0;
+    insertEvents(4, false, rows);
+    rows += 4;
+    JdbcPOJOPollInputOperator poller = (JdbcPOJOPollInputOperator)itr.next().getPartitionedInstance();
+    CollectorTestSink<Object> sink3 = new CollectorTestSink<>();
+    poller.outputPort.setSink(sink3);
+    poller.beginWindow(windowId++);
+    poller.pollRecords();
+    poller.emitTuples();
+    Assert.assertEquals("emitted", rows, sink3.collectedTuples.size());
+    poller.endWindow();
+
+    insertEvents(1, false, rows);
+    rows += 1;
+    poller.beginWindow(windowId++);
+    poller.pollRecords(); // offset rebase, fetch 1 record
+    poller.emitTuples();
+    Assert.assertEquals("emitted", rows, sink3.collectedTuples.size());
+    poller.endWindow();
+
+  }
+
 }

-- 
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <co...@apex.apache.org>'].