You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2017/01/24 05:09:58 UTC

apex-malhar git commit: APEXMALHAR-2368 code changes to optimize memory usage in JdbcPoll operator

Repository: apex-malhar
Updated Branches:
  refs/heads/master 1ae14c03a -> cf896b055


APEXMALHAR-2368 code changes to optimize memory usage in JdbcPoll operator


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/cf896b05
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/cf896b05
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/cf896b05

Branch: refs/heads/master
Commit: cf896b055723786bfe610fe1679408db07943a0d
Parents: 1ae14c0
Author: Hitesh-Scorpio <fo...@gmail.com>
Authored: Tue Dec 13 19:39:43 2016 +0530
Committer: Hitesh-Scorpio <fo...@gmail.com>
Committed: Mon Jan 23 14:08:23 2017 +0530

----------------------------------------------------------------------
 .../db/jdbc/AbstractJdbcPollInputOperator.java  | 90 +++++++++++++-------
 1 file changed, 60 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/cf896b05/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
index 6bd5121..86a443c 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
@@ -100,9 +100,14 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
   private static int DEFAULT_FETCH_SIZE = 20000;
   private static int DEFAULT_BATCH_SIZE = 2000;
   private static int DEFAULT_SLEEP_TIME = 100;
+  private static int DEFAULT_RESULT_LIMIT = 100000;
   private int pollInterval = DEFAULT_POLL_INTERVAL; //in miliseconds
   private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
   private int fetchSize = DEFAULT_FETCH_SIZE;
+  /**
+   * Parameter to limit the number of results to fetch in one query by the Poller partition.
+   */
+  private int resultLimit = DEFAULT_RESULT_LIMIT;
 
   @Min(1)
   private int partitionCount = 1;
@@ -127,13 +132,14 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
   private transient ScheduledExecutorService scanService;
   private transient AtomicReference<Throwable> threadException;
   protected transient boolean isPolled;
-  protected transient Integer lastPolledRow;
   protected transient LinkedBlockingDeque<T> emitQueue;
   protected transient PreparedStatement ps;
   protected boolean isPollerPartition;
 
   protected transient MutablePair<Integer, Integer> currentWindowRecoveryState;
 
+  private transient int lastOffset;
+
   public AbstractJdbcPollInputOperator()
   {
     currentWindowRecoveryState = new MutablePair<>();
@@ -175,8 +181,7 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
     try {
       // If its a range query pass upper and lower bounds, If its a polling query pass only the lower bound
       if (isPollerPartition) {
-        ps = store.getConnection().prepareStatement(buildRangeQuery(rangeQueryPair.getKey(), Integer.MAX_VALUE),
-            TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
+        lastOffset = rangeQueryPair.getKey();
       } else {
         ps = store.getConnection().prepareStatement(
             buildRangeQuery(rangeQueryPair.getKey(), (rangeQueryPair.getValue() - rangeQueryPair.getKey())),
@@ -203,30 +208,11 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
       }
     }
     if (isPollerPartition) {
-      updatePollQuery();
       isPolled = false;
     }
     lowerBound = lastEmittedRow;
   }
 
-  private void updatePollQuery()
-  {
-    if ((lastPolledRow != lastEmittedRow)) {
-      if (lastEmittedRow == null) {
-        lastPolledRow = rangeQueryPair.getKey();
-      } else {
-        lastPolledRow = lastEmittedRow;
-      }
-      try {
-        ps = store.getConnection().prepareStatement(buildRangeQuery(lastPolledRow, Integer.MAX_VALUE),
-            TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
-      } catch (SQLException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-  }
-
   @Override
   public void emitTuples()
   {
@@ -269,20 +255,42 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
     store.disconnect();
   }
 
+  /**
+   * Function to insert results of a query in emit Queue
+   * @param preparedStatement PreparedStatement to execute the query and store the results in emit Queue.
+   */
+  protected void insertDbDataInQueue(PreparedStatement preparedStatement) throws SQLException, InterruptedException
+  {
+    preparedStatement.setFetchSize(getFetchSize());
+    ResultSet result = preparedStatement.executeQuery();
+    if (result.next()) {
+      do {
+        while (!emitQueue.offer(getTuple(result))) {
+          Thread.sleep(DEFAULT_SLEEP_TIME);
+        }
+      } while (result.next());
+      result.close();
+    }
+    preparedStatement.close();
+  }
+
   protected void pollRecords()
   {
     if (isPolled) {
       return;
     }
     try {
-      ps.setFetchSize(getFetchSize());
-      ResultSet result = ps.executeQuery();
-      if (result.next()) {
-        do {
-          while (!emitQueue.offer(getTuple(result))) {
-            Thread.sleep(DEFAULT_SLEEP_TIME);
-          }
-        } while (result.next());
+      if (isPollerPartition) {
+        int nextOffset = getRecordsCount();
+        while (lastOffset < nextOffset) {
+          PreparedStatement preparedStatement = store.getConnection().prepareStatement(buildRangeQuery(lastOffset, resultLimit),
+              TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
+          insertDbDataInQueue(preparedStatement);
+          lastOffset = lastOffset + resultLimit;
+        }
+        lastOffset = nextOffset;
+      } else {
+        insertDbDataInQueue(ps);
       }
       isPolled = true;
     } catch (SQLException ex) {
@@ -295,6 +303,7 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
         store.disconnect();
       }
     }
+    isPolled = true;
   }
 
   public abstract T getTuple(ResultSet result);
@@ -334,6 +343,9 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
             }
             ps = store.getConnection().prepareStatement(buildRangeQuery(bound, Integer.MAX_VALUE), TYPE_FORWARD_ONLY,
                 CONCUR_READ_ONLY);
+            if (isPollerPartition) {
+              lastOffset = bound;
+            }
           }
           scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS);
         } catch (SQLException e) {
@@ -717,6 +729,24 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
     this.key = key;
   }
 
+  /**
+   * gets the Result Limit size, parameter to limit the number of results
+   * to fetch in one query by the Poller partition.
+   */
+  public int getResultLimit()
+  {
+    return resultLimit;
+  }
+
+  /**
+   * Sets the
+   * @param resultLimit Parameter to limit the number of results to fetch in one query by the Poller partition.
+   */
+  public void setResultLimit(int resultLimit)
+  {
+    this.resultLimit = resultLimit;
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcPollInputOperator.class);
 
 }