You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/08/29 05:33:31 UTC
[apex-malhar] branch master updated: APEXMALHAR-2514 JDBC poll
offset rebase support.
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git
The following commit(s) were added to refs/heads/master by this push:
new cae29f8 APEXMALHAR-2514 JDBC poll offset rebase support.
cae29f8 is described below
commit cae29f87a8819eb35d9ad917d6101283a56544d7
Author: Thomas Weise <th...@apache.org>
AuthorDate: Thu Jul 27 11:16:09 2017 -0700
APEXMALHAR-2514 JDBC poll offset rebase support.
---
.../lib/db/jdbc/AbstractJdbcPollInputOperator.java | 279 +++++++++++++--------
.../lib/db/jdbc/JdbcPOJOPollInputOperator.java | 2 +-
.../lib/db/jdbc/JdbcPojoPollableOpeartorTest.java | 80 +++++-
3 files changed, 259 insertions(+), 102 deletions(-)
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
index af8f77f..2042ac8 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
@@ -24,14 +24,15 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
@@ -85,7 +86,7 @@ import static org.jooq.impl.DSL.field;
* The operator uses jOOQ to build the SQL queries based on the discovered {@link org.jooq.SQLDialect}.
* Note that some of the dialects (including Oracle) are only available in commercial
* jOOQ distributions. If the dialect is not available, a generic translation is applied,
- * you can post-process the generated SQL by overriding {@link #buildRangeQuery(int, int)}.
+ * you can post-process the generated SQL by overriding {@link #buildRangeQuery(Object, int, int)}.
*
* @displayName Jdbc Polling Input Operator
* @category Input
@@ -98,13 +99,14 @@ import static org.jooq.impl.DSL.field;
public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInputOperator<T, JdbcStore> implements
ActivationListener<OperatorContext>, Partitioner<AbstractJdbcPollInputOperator<T>>
{
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcPollInputOperator.class);
private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024;
private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
private static int DEFAULT_FETCH_SIZE = 20000;
private static int DEFAULT_BATCH_SIZE = 2000;
private static int DEFAULT_SLEEP_TIME = 100;
private static int DEFAULT_RESULT_LIMIT = 100000;
- private int pollInterval = DEFAULT_POLL_INTERVAL; //in miliseconds
+ private int pollInterval = DEFAULT_POLL_INTERVAL; //in milliseconds
private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
private int fetchSize = DEFAULT_FETCH_SIZE;
/**
@@ -112,8 +114,8 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
*/
private int resultLimit = DEFAULT_RESULT_LIMIT;
- @Min(1)
- private int partitionCount = 1;
+ @Min(0)
+ private int partitionCount = 0;
private int batchSize = DEFAULT_BATCH_SIZE;
@NotNull
@@ -124,25 +126,37 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
private String whereCondition = null;
private long currentWindowId;
private WindowDataManager windowManager;
+ protected WindowData currentWindowRecoveryState;
+ private boolean rebaseOffset;
protected KeyValPair<Integer, Integer> rangeQueryPair;
- protected Integer lowerBound;
protected Integer lastEmittedRow;
protected transient DSLContext dslContext;
private transient volatile boolean execute;
private transient ScheduledExecutorService scanService;
private transient ScheduledFuture<?> pollFuture;
- protected transient LinkedBlockingDeque<T> emitQueue;
+ protected transient LinkedBlockingQueue<T> emitQueue;
protected transient PreparedStatement ps;
protected boolean isPollerPartition;
- protected transient MutablePair<Integer, Integer> currentWindowRecoveryState;
-
private transient int lastOffset;
+ private transient Object prevKey;
+ private transient Object lastKey;
+
+ /**
+ * The candidate key/offset pair identified by the poller thread
+ * that, once emitted, can be used to rebase the lower bound for subsequent queries.
+ */
+ private transient AtomicReference<MutablePair<Object, Integer>> fetchedKeyAndOffset = new AtomicReference<>();
+
+ /**
+ * Signal to the fetch thread to rebase query.
+ */
+ private transient AtomicBoolean adjustKeyAndOffset = new AtomicBoolean();
public AbstractJdbcPollInputOperator()
{
- currentWindowRecoveryState = new MutablePair<>();
+ currentWindowRecoveryState = new WindowData();
windowManager = new FSWindowDataManager();
}
@@ -155,7 +169,7 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
scanService = Executors.newScheduledThreadPool(1);
}
execute = true;
- emitQueue = new LinkedBlockingDeque<>(queueCapacity);
+ emitQueue = new LinkedBlockingQueue<>(queueCapacity);
windowManager.setup(context);
}
@@ -167,10 +181,10 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
@Override
public void activate(OperatorContext context)
{
- initializePreparedStatement();
long largestRecoveryWindow = windowManager.getLargestCompletedWindow();
if (largestRecoveryWindow == Stateless.WINDOW_ID
|| context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) > largestRecoveryWindow) {
+ initializePreparedStatement();
schedulePollTask();
}
}
@@ -197,16 +211,18 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
protected void initializePreparedStatement()
{
try {
- // If its a range query pass upper and lower bounds, If its a polling query pass only the lower bound
- if (isPollerPartition) {
+ if (currentWindowRecoveryState.lowerBound == 0 && currentWindowRecoveryState.key == null) {
lastOffset = rangeQueryPair.getKey();
} else {
+ lastOffset = currentWindowRecoveryState.lowerBound;
+ lastKey = currentWindowRecoveryState.key;
+ }
+ if (!isPollerPartition) {
ps = store.getConnection().prepareStatement(
- buildRangeQuery(rangeQueryPair.getKey(), (rangeQueryPair.getValue() - rangeQueryPair.getKey())),
+ buildRangeQuery(null, lastOffset, (rangeQueryPair.getValue() - lastOffset)),
TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
}
} catch (SQLException e) {
- LOG.error("Exception in initializing the range query for a given partition", e);
throw new RuntimeException(e);
}
@@ -224,7 +240,20 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
throw new RuntimeException("Replay failed", e);
}
}
- lowerBound = lastEmittedRow;
+
+ currentWindowRecoveryState = WindowData.of(currentWindowRecoveryState.key, lastEmittedRow, 0);
+ if (isPollerPartition) {
+ MutablePair<Object, Integer> keyOffset = fetchedKeyAndOffset.get();
+ if (keyOffset != null && keyOffset.getRight() < lastEmittedRow) {
+ if (!adjustKeyAndOffset.get()) {
+ // rebase offset
+ lastEmittedRow -= keyOffset.getRight();
+ currentWindowRecoveryState.lowerBound = lastEmittedRow;
+ currentWindowRecoveryState.key = keyOffset.getLeft();
+ adjustKeyAndOffset.set(true);
+ }
+ }
+ }
}
@Override
@@ -246,6 +275,26 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
protected abstract void emitTuple(T tuple);
+ /**
+ * Visible to subclasses to allow for custom offset saving and initialization.
+ */
+ protected static class WindowData
+ {
+ // members visible for access in subclasses of poll operator
+ public Object key;
+ public int lowerBound;
+ public int upperBound;
+
+ public static WindowData of(Object key, int lowerBound, int upperBound)
+ {
+ WindowData wd = new WindowData();
+ wd.key = key;
+ wd.lowerBound = lowerBound;
+ wd.upperBound = upperBound;
+ return wd;
+ }
+ }
+
@Override
public void endWindow()
{
@@ -266,7 +315,7 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
try {
if (currentWindowId > windowManager.getLargestCompletedWindow()) {
- currentWindowRecoveryState = new MutablePair<>(lowerBound, lastEmittedRow);
+ currentWindowRecoveryState.upperBound = lastEmittedRow;
windowManager.save(currentWindowRecoveryState, currentWindowId);
}
} catch (IOException e) {
@@ -282,23 +331,45 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
store.disconnect();
}
+ protected Object extractKey(ResultSet rs) throws SQLException
+ {
+ return rs.getObject(this.key);
+ }
+
/**
* Execute the query and transfer results to the emit queue.
* @param preparedStatement PreparedStatement to execute the query and fetch results.
*/
- protected void insertDbDataInQueue(PreparedStatement preparedStatement) throws SQLException, InterruptedException
+ protected int insertDbDataInQueue(PreparedStatement preparedStatement) throws SQLException, InterruptedException
{
+ int resultCount = 0;
preparedStatement.setFetchSize(getFetchSize());
ResultSet result = preparedStatement.executeQuery();
- if (result.next()) {
- do {
- while (execute && !emitQueue.offer(getTuple(result))) {
- Thread.sleep(DEFAULT_SLEEP_TIME);
+ while (execute && result.next()) {
+ T obj = getTuple(result);
+ if (obj == null) {
+ continue;
+ }
+ while (execute && !emitQueue.offer(obj)) {
+ Thread.sleep(DEFAULT_SLEEP_TIME);
+ }
+ if (isPollerPartition && rebaseOffset) {
+ if (prevKey == null) {
+ prevKey = extractKey(result);
+ } else if (this.fetchedKeyAndOffset.get() == null) {
+ // track key change
+ Object nextKey = extractKey(result);
+ if (!nextKey.equals(prevKey)) {
+ // new key, ready for rebase (WHERE key > ?)
+ fetchedKeyAndOffset.set(new MutablePair<>(prevKey, lastOffset + resultCount));
+ }
}
- } while (execute && result.next());
- result.close();
+ }
+ resultCount++;
}
+ result.close();
preparedStatement.close();
+ return resultCount;
}
/**
@@ -308,15 +379,21 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
{
try {
if (isPollerPartition) {
- LOG.debug("poll query");
- int nextOffset = getRecordsCount();
- while (lastOffset < nextOffset) {
- PreparedStatement preparedStatement = store.getConnection().prepareStatement(buildRangeQuery(lastOffset, resultLimit),
+ if (adjustKeyAndOffset.get()) {
+ LOG.debug("lastOffset {} lastKey {} rebase {}", lastOffset, lastKey, fetchedKeyAndOffset.get());
+ lastOffset -= fetchedKeyAndOffset.get().getRight();
+ lastKey = fetchedKeyAndOffset.get().getLeft();
+ prevKey = null;
+ fetchedKeyAndOffset.set(null);
+ adjustKeyAndOffset.set(false);
+ }
+ int count = getRecordsCount(lastKey);
+ LOG.debug("Poll count {}", count);
+ while (lastOffset < count) {
+ PreparedStatement preparedStatement = store.getConnection().prepareStatement(buildRangeQuery(lastKey, lastOffset, resultLimit),
TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
- insertDbDataInQueue(preparedStatement);
- lastOffset = lastOffset + resultLimit;
+ lastOffset += insertDbDataInQueue(preparedStatement);
}
- lastOffset = nextOffset;
} else {
insertDbDataInQueue(ps);
}
@@ -335,42 +412,21 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
protected void replay(long windowId) throws SQLException
{
try {
- @SuppressWarnings("unchecked")
- MutablePair<Integer, Integer> recoveredData = (MutablePair<Integer, Integer>)windowManager.retrieve(windowId);
-
- if (recoveredData != null && shouldReplayWindow(recoveredData)) {
- LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", windowId, recoveredData.left,
- recoveredData.right);
+ WindowData wd = (WindowData)windowManager.retrieve(windowId);
+ if (wd != null && wd.upperBound - wd.lowerBound > 0) {
+ LOG.debug("[Recovering Window ID - {} for key: {} record range: {}, {}]", windowId, wd.key,
+ wd.lowerBound, wd.upperBound);
ps = store.getConnection().prepareStatement(
- buildRangeQuery(recoveredData.left, (recoveredData.right - recoveredData.left)), TYPE_FORWARD_ONLY,
+ buildRangeQuery(wd.key, wd.lowerBound, (wd.upperBound - wd.lowerBound)), TYPE_FORWARD_ONLY,
CONCUR_READ_ONLY);
LOG.info("Query formed to recover data - {}", ps.toString());
emitReplayedTuples(ps);
}
if (currentWindowId == windowManager.getLargestCompletedWindow()) {
- try {
- if (!isPollerPartition && rangeQueryPair.getValue() != null) {
- ps = store.getConnection().prepareStatement(
- buildRangeQuery(lastEmittedRow, (rangeQueryPair.getValue() - lastEmittedRow)), TYPE_FORWARD_ONLY,
- CONCUR_READ_ONLY);
- } else {
- Integer bound = null;
- if (lastEmittedRow == null) {
- bound = rangeQueryPair.getKey();
- } else {
- bound = lastEmittedRow;
- }
- ps = store.getConnection().prepareStatement(buildRangeQuery(bound, Integer.MAX_VALUE), TYPE_FORWARD_ONLY,
- CONCUR_READ_ONLY);
- if (isPollerPartition) {
- lastOffset = bound;
- }
- }
- schedulePollTask();
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
+ currentWindowRecoveryState = WindowData.of(wd.key, wd.upperBound, wd.upperBound);
+ initializePreparedStatement();
+ schedulePollTask();
}
} catch (IOException e) {
throw new RuntimeException("Exception during replay of records.", e);
@@ -378,17 +434,6 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
}
- private boolean shouldReplayWindow(MutablePair<Integer, Integer> recoveredData)
- {
- if (recoveredData.left == null || recoveredData.right == null) {
- return false;
- }
- if (recoveredData.right.equals(rangeQueryPair.getValue()) || recoveredData.right.equals(lastEmittedRow)) {
- return false;
- }
- return true;
- }
-
/**
* Replays the tuples in sync mode for replayed windows
*/
@@ -423,11 +468,11 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
List<Partition<AbstractJdbcPollInputOperator<T>>> newPartitions = new ArrayList<>(
getPartitionCount());
- HashMap<Integer, KeyValPair<Integer, Integer>> partitionToRangeMap = null;
+ final List<KeyValPair<Integer, Integer>> partitionRanges;
try {
store.connect();
dslContext = createDSLContext();
- partitionToRangeMap = getPartitionedQueryRangeMap(getPartitionCount());
+ partitionRanges = getPartitionedQueryRanges(getPartitionCount());
} catch (SQLException e) {
LOG.error("Exception in initializing the partition range", e);
throw new RuntimeException(e);
@@ -436,24 +481,25 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
}
KryoCloneUtils<AbstractJdbcPollInputOperator<T>> cloneUtils = KryoCloneUtils.createCloneUtils(this);
+ int pollOffset = 0;
// The n given partitions are for range queries and n + 1 partition is for polling query
- for (int i = 0; i <= getPartitionCount(); i++) {
+ for (KeyValPair<Integer, Integer> range : partitionRanges) {
AbstractJdbcPollInputOperator<T> jdbcPoller = cloneUtils.getClone();
- if (i < getPartitionCount()) {
- jdbcPoller.rangeQueryPair = partitionToRangeMap.get(i);
- jdbcPoller.lastEmittedRow = partitionToRangeMap.get(i).getKey();
- jdbcPoller.isPollerPartition = false;
- } else {
- // The upper bound for the n+1 partition is set to null since its a pollable partition
- int partitionKey = partitionToRangeMap.get(i - 1).getValue();
- jdbcPoller.rangeQueryPair = new KeyValPair<>(partitionKey, null);
- jdbcPoller.lastEmittedRow = partitionKey;
- jdbcPoller.isPollerPartition = true;
- }
+ jdbcPoller.rangeQueryPair = range;
+ jdbcPoller.lastEmittedRow = range.getKey();
+ jdbcPoller.isPollerPartition = false;
newPartitions.add(new DefaultPartition<>(jdbcPoller));
+ pollOffset = range.getValue();
}
+ // The upper bound for the n+1 partition is set to null since its a pollable partition
+ AbstractJdbcPollInputOperator<T> jdbcPoller = cloneUtils.getClone();
+ jdbcPoller.rangeQueryPair = new KeyValPair<>(pollOffset, null);
+ jdbcPoller.lastEmittedRow = pollOffset;
+ jdbcPoller.isPollerPartition = true;
+ newPartitions.add(new DefaultPartition<>(jdbcPoller));
+
return newPartitions;
}
@@ -464,25 +510,34 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
// Nothing to implement here
}
- private HashMap<Integer, KeyValPair<Integer, Integer>> getPartitionedQueryRangeMap(int partitions)
+ private List<KeyValPair<Integer, Integer>> getPartitionedQueryRanges(int partitions)
throws SQLException
{
+ if (partitions == 0) {
+ return new ArrayList<>(0);
+ }
+
int rowCount = 0;
try {
- rowCount = getRecordsCount();
+ rowCount = getRecordsCount(null);
} catch (SQLException e) {
LOG.error("Exception in getting the record range", e);
}
- HashMap<Integer, KeyValPair<Integer, Integer>> partitionToQueryMap = new HashMap<>();
+ List<KeyValPair<Integer, Integer>> partitionToQueryList = new ArrayList<>();
int events = (rowCount / partitions);
for (int i = 0, lowerOffset = 0, upperOffset = events; i < partitions - 1; i++, lowerOffset += events, upperOffset += events) {
- partitionToQueryMap.put(i, new KeyValPair<>(lowerOffset, upperOffset));
+ partitionToQueryList.add(new KeyValPair<>(lowerOffset, upperOffset));
}
- partitionToQueryMap.put(partitions - 1, new KeyValPair<>(events * (partitions - 1), rowCount));
- LOG.info("Partition map - " + partitionToQueryMap.toString());
- return partitionToQueryMap;
+ partitionToQueryList.add(new KeyValPair<>(events * (partitions - 1), rowCount));
+ LOG.info("Partition ranges - " + partitionToQueryList.toString());
+ return partitionToQueryList;
+ }
+
+ protected Condition andLowerBoundKeyCondition(Condition c, Object lowerBoundKey)
+ {
+ return c.and(this.key + " > ?", lowerBoundKey);
}
/**
@@ -490,12 +545,17 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
*
* @return number of records in table
*/
- private int getRecordsCount() throws SQLException
+ private int getRecordsCount(Object lowerBoundKey) throws SQLException
{
Condition condition = DSL.trueCondition();
if (getWhereCondition() != null) {
condition = condition.and(getWhereCondition());
}
+
+ if (isPollerPartition && lowerBoundKey != null) {
+ condition = andLowerBoundKeyCondition(condition, lowerBoundKey);
+ }
+
int recordsCount = dslContext.select(DSL.count()).from(getTableName()).where(condition).fetchOne(0, int.class);
return recordsCount;
}
@@ -503,13 +563,17 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
/**
* Helper function returns a range query based on the bounds passed<br>
*/
- protected String buildRangeQuery(int offset, int limit)
+ protected String buildRangeQuery(Object lowerBoundKey, int offset, int limit)
{
Condition condition = DSL.trueCondition();
if (getWhereCondition() != null) {
condition = condition.and(getWhereCondition());
}
+ if (isPollerPartition && lowerBoundKey != null) {
+ condition = andLowerBoundKeyCondition(condition, lowerBoundKey);
+ }
+
String sqlQuery;
if (getColumnsExpression() != null) {
Collection<Field<?>> columns = new ArrayList<>();
@@ -563,7 +627,8 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
}
/**
- * Sets non-polling static partitions count
+ * Sets non-polling static partitions count.<p>
+ * When set to 0, the operator will run in poll mode only.
*
* @param partitionCount
*/
@@ -750,6 +815,24 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
this.resultLimit = resultLimit;
}
- private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcPollInputOperator.class);
+ public boolean isRebaseOffset()
+ {
+ return rebaseOffset;
+ }
+
+ /**
+ * Whether the query should automatically be augmented with a WHERE
+ * condition for trailing lower bound key value.
+ * <p>
+ * Rebase allows the operator to poll from tables where old data is
+ * periodically purged. Without it, the default zero based row offset would
+ * lead to missed data. The trailing floor is also more efficient when working
+ * with key partitioned sources as the query planner can skip those partitions
+ * below the base key.
+ */
+ public void setRebaseOffset(boolean rebaseOffset)
+ {
+ this.rebaseOffset = rebaseOffset;
+ }
}
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
index 897fe10..9f2c3e3 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
@@ -186,7 +186,7 @@ public class JdbcPOJOPollInputOperator extends AbstractJdbcPollInputOperator<Obj
protected void populateColumnDataTypes() throws SQLException
{
columnDataTypes = Lists.newArrayList();
- try (PreparedStatement stmt = store.getConnection().prepareStatement(buildRangeQuery(1, 1))) {
+ try (PreparedStatement stmt = store.getConnection().prepareStatement(buildRangeQuery(null, 1, 1))) {
Map<String, Integer> nameToType = Maps.newHashMap();
ResultSetMetaData rsMetaData = stmt.getMetaData();
LOG.debug("resultSet MetaData column count {}", rsMetaData.getColumnCount());
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
index 1f8d7db..972dacb 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
@@ -38,7 +38,6 @@ import org.mockito.MockitoAnnotations;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.tuple.MutablePair;
import com.google.common.collect.Lists;
@@ -47,6 +46,7 @@ import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Partitioner;
+import com.datatorrent.lib.db.jdbc.AbstractJdbcPollInputOperator.WindowData;
import com.datatorrent.lib.helper.TestPortContext;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.FieldInfo;
@@ -70,9 +70,10 @@ public class JdbcPojoPollableOpeartorTest extends JdbcOperatorTest
private WindowDataManager windowDataManagerMock;
@Before
- public void beforeTest()
+ public void beforeTest() throws IOException
{
dir = "target/" + APP_ID + "/";
+ FileUtils.deleteDirectory(new File(dir));
MockitoAnnotations.initMocks(this);
when(mockscheduler.scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)))
@@ -186,7 +187,7 @@ public class JdbcPojoPollableOpeartorTest extends JdbcOperatorTest
{
int operatorId = 1;
when(windowDataManagerMock.getLargestCompletedWindow()).thenReturn(1L);
- when(windowDataManagerMock.retrieve(1)).thenReturn(new MutablePair<>(0, 4));
+ when(windowDataManagerMock.retrieve(1)).thenReturn(WindowData.of(null, 0, 4));
insertEvents(10, true, 0);
@@ -337,4 +338,77 @@ public class JdbcPojoPollableOpeartorTest extends JdbcOperatorTest
return fieldInfos;
}
+ @Test
+ public void testPollWithOffsetRebase() throws Exception
+ {
+ insertEvents(0, true, 0); // clear table
+
+ JdbcStore store = new JdbcStore();
+ store.setDatabaseDriver(DB_DRIVER);
+ store.setDatabaseUrl(URL);
+
+ List<FieldInfo> fieldInfos = getFieldInfos();
+
+ Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+ TestPortContext tpc = new TestPortContext(portAttributes);
+
+ JdbcPOJOPollInputOperator inputOperator = new JdbcPOJOPollInputOperator();
+ inputOperator.setStore(store);
+ inputOperator.setTableName(TABLE_POJO_NAME);
+ inputOperator.setColumnsExpression("ID,STARTDATE,STARTTIME,STARTTIMESTAMP");
+ inputOperator.setKey("id");
+ inputOperator.setFieldInfos(fieldInfos);
+ inputOperator.setFetchSize(100);
+ inputOperator.setBatchSize(100);
+ inputOperator.setPartitionCount(1);
+ inputOperator.setRebaseOffset(true);
+
+ Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>> newPartitions = inputOperator
+ .definePartitions(new ArrayList<Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>>(), null);
+
+ int operatorId = 0;
+ for (com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>> partition : newPartitions) {
+
+ Attribute.AttributeMap.DefaultAttributeMap partitionAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+ partitionAttributeMap.put(DAG.APPLICATION_ID, APP_ID);
+ partitionAttributeMap.put(Context.DAGContext.APPLICATION_PATH, dir);
+
+ OperatorContext partitioningContext = mockOperatorContext(operatorId++, partitionAttributeMap);
+
+ JdbcPOJOPollInputOperator parition = (JdbcPOJOPollInputOperator)partition.getPartitionedInstance();
+ parition.outputPort.setup(tpc);
+ parition.setScheduledExecutorService(mockscheduler);
+ parition.setup(partitioningContext);
+ parition.activate(partitioningContext);
+ }
+
+ Iterator<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>> itr = newPartitions
+ .iterator();
+ // First partition is for range queries,last is for polling queries
+ JdbcPOJOPollInputOperator firstInstance = (JdbcPOJOPollInputOperator)itr.next().getPartitionedInstance();
+
+ int rows = 0;
+ int windowId = 0;
+ insertEvents(4, false, rows);
+ rows += 4;
+ JdbcPOJOPollInputOperator poller = (JdbcPOJOPollInputOperator)itr.next().getPartitionedInstance();
+ CollectorTestSink<Object> sink3 = new CollectorTestSink<>();
+ poller.outputPort.setSink(sink3);
+ poller.beginWindow(windowId++);
+ poller.pollRecords();
+ poller.emitTuples();
+ Assert.assertEquals("emitted", rows, sink3.collectedTuples.size());
+ poller.endWindow();
+
+ insertEvents(1, false, rows);
+ rows += 1;
+ poller.beginWindow(windowId++);
+ poller.pollRecords(); // offset rebase, fetch 1 record
+ poller.emitTuples();
+ Assert.assertEquals("emitted", rows, sink3.collectedTuples.size());
+ poller.endWindow();
+
+ }
+
}
--
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <co...@apex.apache.org>'].