You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2017/01/24 05:09:58 UTC
apex-malhar git commit: APEXMALHAR-2368 code changes to optimize
memory usage in JdbcPoll operator
Repository: apex-malhar
Updated Branches:
refs/heads/master 1ae14c03a -> cf896b055
APEXMALHAR-2368 code changes to optimize memory usage in JdbcPoll operator
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/cf896b05
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/cf896b05
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/cf896b05
Branch: refs/heads/master
Commit: cf896b055723786bfe610fe1679408db07943a0d
Parents: 1ae14c0
Author: Hitesh-Scorpio <fo...@gmail.com>
Authored: Tue Dec 13 19:39:43 2016 +0530
Committer: Hitesh-Scorpio <fo...@gmail.com>
Committed: Mon Jan 23 14:08:23 2017 +0530
----------------------------------------------------------------------
.../db/jdbc/AbstractJdbcPollInputOperator.java | 90 +++++++++++++-------
1 file changed, 60 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/cf896b05/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
index 6bd5121..86a443c 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
@@ -100,9 +100,14 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
private static int DEFAULT_FETCH_SIZE = 20000;
private static int DEFAULT_BATCH_SIZE = 2000;
private static int DEFAULT_SLEEP_TIME = 100;
+ private static int DEFAULT_RESULT_LIMIT = 100000;
private int pollInterval = DEFAULT_POLL_INTERVAL; //in miliseconds
private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
private int fetchSize = DEFAULT_FETCH_SIZE;
+ /**
+ * Parameter to limit the number of results to fetch in one query by the Poller partition.
+ */
+ private int resultLimit = DEFAULT_RESULT_LIMIT;
@Min(1)
private int partitionCount = 1;
@@ -127,13 +132,14 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
private transient ScheduledExecutorService scanService;
private transient AtomicReference<Throwable> threadException;
protected transient boolean isPolled;
- protected transient Integer lastPolledRow;
protected transient LinkedBlockingDeque<T> emitQueue;
protected transient PreparedStatement ps;
protected boolean isPollerPartition;
protected transient MutablePair<Integer, Integer> currentWindowRecoveryState;
+ private transient int lastOffset;
+
public AbstractJdbcPollInputOperator()
{
currentWindowRecoveryState = new MutablePair<>();
@@ -175,8 +181,7 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
try {
// If its a range query pass upper and lower bounds, If its a polling query pass only the lower bound
if (isPollerPartition) {
- ps = store.getConnection().prepareStatement(buildRangeQuery(rangeQueryPair.getKey(), Integer.MAX_VALUE),
- TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
+ lastOffset = rangeQueryPair.getKey();
} else {
ps = store.getConnection().prepareStatement(
buildRangeQuery(rangeQueryPair.getKey(), (rangeQueryPair.getValue() - rangeQueryPair.getKey())),
@@ -203,30 +208,11 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
}
}
if (isPollerPartition) {
- updatePollQuery();
isPolled = false;
}
lowerBound = lastEmittedRow;
}
- private void updatePollQuery()
- {
- if ((lastPolledRow != lastEmittedRow)) {
- if (lastEmittedRow == null) {
- lastPolledRow = rangeQueryPair.getKey();
- } else {
- lastPolledRow = lastEmittedRow;
- }
- try {
- ps = store.getConnection().prepareStatement(buildRangeQuery(lastPolledRow, Integer.MAX_VALUE),
- TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
- }
-
- }
-
@Override
public void emitTuples()
{
@@ -269,20 +255,42 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
store.disconnect();
}
+ /**
+ * Function to insert results of a query in emit Queue
+ * @param preparedStatement PreparedStatement to execute the query and store the results in emit Queue.
+ */
+ protected void insertDbDataInQueue(PreparedStatement preparedStatement) throws SQLException, InterruptedException
+ {
+ preparedStatement.setFetchSize(getFetchSize());
+ ResultSet result = preparedStatement.executeQuery();
+ if (result.next()) {
+ do {
+ while (!emitQueue.offer(getTuple(result))) {
+ Thread.sleep(DEFAULT_SLEEP_TIME);
+ }
+ } while (result.next());
+ result.close();
+ }
+ preparedStatement.close();
+ }
+
protected void pollRecords()
{
if (isPolled) {
return;
}
try {
- ps.setFetchSize(getFetchSize());
- ResultSet result = ps.executeQuery();
- if (result.next()) {
- do {
- while (!emitQueue.offer(getTuple(result))) {
- Thread.sleep(DEFAULT_SLEEP_TIME);
- }
- } while (result.next());
+ if (isPollerPartition) {
+ int nextOffset = getRecordsCount();
+ while (lastOffset < nextOffset) {
+ PreparedStatement preparedStatement = store.getConnection().prepareStatement(buildRangeQuery(lastOffset, resultLimit),
+ TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
+ insertDbDataInQueue(preparedStatement);
+ lastOffset = lastOffset + resultLimit;
+ }
+ lastOffset = nextOffset;
+ } else {
+ insertDbDataInQueue(ps);
}
isPolled = true;
} catch (SQLException ex) {
@@ -295,6 +303,7 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
store.disconnect();
}
}
+ isPolled = true;
}
public abstract T getTuple(ResultSet result);
@@ -334,6 +343,9 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
}
ps = store.getConnection().prepareStatement(buildRangeQuery(bound, Integer.MAX_VALUE), TYPE_FORWARD_ONLY,
CONCUR_READ_ONLY);
+ if (isPollerPartition) {
+ lastOffset = bound;
+ }
}
scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS);
} catch (SQLException e) {
@@ -717,6 +729,24 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
this.key = key;
}
+ /**
+ * gets the Result Limit size, parameter to limit the number of results
+ * to fetch in one query by the Poller partition.
+ */
+ public int getResultLimit()
+ {
+ return resultLimit;
+ }
+
+ /**
+ * Sets the
+ * @param resultLimit Parameter to limit the number of results to fetch in one query by the Poller partition.
+ */
+ public void setResultLimit(int resultLimit)
+ {
+ this.resultLimit = resultLimit;
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcPollInputOperator.class);
}