You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by bh...@apache.org on 2016/08/11 11:37:22 UTC
[2/2] apex-malhar git commit: APEXMALHAR-2172: Updates to JDBC Poll
Input Operator
APEXMALHAR-2172: Updates to JDBC Poll Input Operator
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/26fa9d78
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/26fa9d78
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/26fa9d78
Branch: refs/heads/master
Commit: 26fa9d781e032fd61fbb2972a99874e77e0c509c
Parents: 3316d6a
Author: Priyanka Gugale <pr...@datatorrent.com>
Authored: Tue Jul 19 12:10:08 2016 +0530
Committer: Priyanka Gugale <pr...@datatorrent.com>
Committed: Thu Aug 11 15:56:42 2016 +0530
----------------------------------------------------------------------
library/pom.xml | 5 +
.../db/jdbc/AbstractJdbcPollInputOperator.java | 915 ++++++++++---------
.../lib/db/jdbc/JdbcMetaDataUtility.java | 353 -------
.../lib/db/jdbc/JdbcPOJOPollInputOperator.java | 325 +++++++
.../lib/db/jdbc/JdbcPollInputOperator.java | 186 +---
.../lib/db/jdbc/JdbcOperatorTest.java | 627 +------------
.../lib/db/jdbc/JdbcPojoOperatorTest.java | 606 ++++++++++++
.../db/jdbc/JdbcPojoPollableOpeartorTest.java | 241 +++++
.../datatorrent/lib/db/jdbc/JdbcPollerTest.java | 246 -----
9 files changed, 1707 insertions(+), 1797 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/26fa9d78/library/pom.xml
----------------------------------------------------------------------
diff --git a/library/pom.xml b/library/pom.xml
index b8c6271..a6bdf64 100644
--- a/library/pom.xml
+++ b/library/pom.xml
@@ -341,6 +341,11 @@
<version>7.0.6</version>
</dependency>
<dependency>
+ <groupId>org.jooq</groupId>
+ <artifactId>jooq</artifactId>
+ <version>3.6.4</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.apex</groupId>
<artifactId>apex-shaded-ning19</artifactId>
<version>1.0.0</version>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/26fa9d78/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
index ab12ed3..234e28c 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
@@ -27,11 +27,23 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.validation.constraints.Min;
-
+import javax.validation.constraints.NotNull;
+
+import org.jooq.Condition;
+import org.jooq.DSLContext;
+import org.jooq.Field;
+import org.jooq.SelectField;
+import org.jooq.conf.ParamType;
+import org.jooq.impl.DSL;
+import org.jooq.tools.jdbc.JDBCUtils;
+import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
@@ -39,242 +51,86 @@ import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import com.google.common.annotations.VisibleForTesting;
+
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Operator.ActivationListener;
-import com.datatorrent.api.Operator.IdleTimeHandler;
import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.lib.db.AbstractStoreInputOperator;
import com.datatorrent.lib.util.KeyValPair;
import com.datatorrent.lib.util.KryoCloneUtils;
import com.datatorrent.netlet.util.DTThrowable;
+import static java.sql.ResultSet.CONCUR_READ_ONLY;
+import static java.sql.ResultSet.TYPE_FORWARD_ONLY;
+import static org.jooq.impl.DSL.field;
+
/**
- * Abstract operator for for consuming data using JDBC interface<br>
- * User needs User needs to provide
- * tableName,dbConnection,setEmitColumnList,look-up key <br>
- * Optionally batchSize,pollInterval,Look-up key and a where clause can be given
- * <br>
- * This operator uses static partitioning to arrive at range queries for exactly
- * once reads<br>
+ * Abstract operator for consuming data using JDBC interface<br>
+ * User needs to provide tableName, dbConnection, columnsExpression, look-up key<br>
+ * Optionally batchSize, pollInterval and a where clause can be given <br>
+ * This operator uses static partitioning to arrive at range queries to
+ * idempotent reads<br>
* This operator will create a configured number of non-polling static
* partitions for fetching the existing data in the table. And an additional
* single partition for polling additive data. Assumption is that there is an
- * ordered column using which range queries can be formed<br>
- * If an emitColumnList is provided, please ensure that the keyColumn is the
- * first column in the list<br>
- * Range queries are formed using the {@link JdbcMetaDataUtility}} Output -
- * comma separated list of the emit columns eg columnA,columnB,columnC<br>
- * Only newly added data which has increasing ids will be fetched by the polling
- * jdbc partition
+ * ordered unique column using which range queries can be formed<br>
*
- * In the next iterations this operator would support an in-clause for
- * idempotency instead of having only range query support to support non ordered
- * key columns
+ * Only newly added data will be fetched by the polling jdbc partition, also
+ * assumption is rows won't be added or deleted in middle during scan.
*
*
* @displayName Jdbc Polling Input Operator
* @category Input
- * @tags database, sql, jdbc, partitionable,exactlyOnce
+ * @tags database, sql, jdbc, partitionable, idepotent, pollable
*/
@Evolving
-public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInputOperator<T, JdbcStore>
- implements ActivationListener<Context>, IdleTimeHandler, Partitioner<AbstractJdbcPollInputOperator<T>>
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInputOperator<T, JdbcStore> implements
+ ActivationListener<OperatorContext>, Partitioner<AbstractJdbcPollInputOperator<T>>
{
- /**
- * poll interval in milliseconds
- */
- private static int pollInterval = 10000;
+ private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024;
+ private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
+ private static int DEFAULT_FETCH_SIZE = 20000;
+ private static int DEFAULT_BATCH_SIZE = 2000;
+ private static int DEFAULT_SLEEP_TIME = 100;
+ private int pollInterval = DEFAULT_POLL_INTERVAL; //in miliseconds
+ private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
+ private int fetchSize = DEFAULT_FETCH_SIZE;
@Min(1)
private int partitionCount = 1;
- protected transient int operatorId;
- protected transient boolean isReplayed;
- protected transient boolean isPollable;
- protected int batchSize;
- protected static int fetchSize = 20000;
- /**
- * Map of windowId to <lower bound,upper bound> of the range key
- */
- protected transient MutablePair<String, String> currentWindowRecoveryState;
-
- /**
- * size of the emit queue used to hold polled records before emit
- */
- private static int queueCapacity = 4 * 1024 * 1024;
+ private int batchSize = DEFAULT_BATCH_SIZE;
+
+ @NotNull
+ private String tableName;
+ @NotNull
+ private String columnsExpression;
+ @NotNull
+ private String key;
+ private String whereCondition = null;
+ private long currentWindowId;
+ private WindowDataManager windowManager;
+
+ protected KeyValPair<Integer, Integer> rangeQueryPair;
+ protected Integer lowerBound;
+ protected Integer lastEmittedRow;
+ private transient int operatorId;
+ private transient DSLContext create;
private transient volatile boolean execute;
- private transient AtomicReference<Throwable> cause;
- protected transient int spinMillis;
- private transient OperatorContext context;
- protected String tableName;
- protected String key;
- protected long currentWindowId;
- protected KeyValPair<String, String> rangeQueryPair;
- protected String lower;
- protected String upper;
- protected boolean recovered;
- protected boolean isPolled;
- protected String whereCondition = null;
- protected String previousUpperBound;
- protected String highestPolled;
- private static final String user = "";
- private static final String password = "";
- /**
- * thread to poll database
- */
- private transient Thread dbPoller;
- protected transient ArrayBlockingQueue<List<T>> emitQueue;
+ private transient ScheduledExecutorService scanService;
+ private transient AtomicReference<Throwable> threadException;
+ protected transient boolean isPolled;
+ protected transient Integer lastPolledRow;
+ protected transient LinkedBlockingDeque<T> emitQueue;
protected transient PreparedStatement ps;
- protected WindowDataManager windowManager;
- private String emitColumnList;
-
- /**
- * Returns the where clause
- */
- public String getWhereCondition()
- {
- return whereCondition;
- }
-
- /**
- * Sets the where clause
- */
- public void setWhereCondition(String whereCondition)
- {
- this.whereCondition = whereCondition;
- }
-
- /**
- * Returns the list of columns to select from the query
- */
- public String getEmitColumnList()
- {
- return emitColumnList;
- }
-
- /**
- * Comma separated list of columns to select from the given table
- */
- public void setEmitColumnList(String emitColumnList)
- {
- this.emitColumnList = emitColumnList;
- }
-
- /**
- * Returns the fetchsize for getting the results
- */
- public int getFetchSize()
- {
- return fetchSize;
- }
-
- /**
- * Sets the fetchsize for getting the results
- */
- public void setFetchSize(int fetchSize)
- {
- this.fetchSize = fetchSize;
- }
-
- protected abstract void pollRecords(PreparedStatement ps);
-
- /**
- * Returns the interval for polling the queue
- */
- public int getPollInterval()
- {
- return pollInterval;
- }
-
- /**
- * Sets the interval for polling the emit queue
- */
- public void setPollInterval(int pollInterval)
- {
- this.pollInterval = pollInterval;
- }
-
- /**
- * Returns the capacity of the emit queue
- */
- public int getQueueCapacity()
- {
- return queueCapacity;
- }
-
- /**
- * Sets the capacity of the emit queue
- */
- public void setQueueCapacity(int queueCapacity)
- {
- this.queueCapacity = queueCapacity;
- }
-
- /**
- * Returns the ordered key used to generate the range queries
- */
- public String getKey()
- {
- return key;
- }
-
- /**
- * Sets the ordered key used to generate the range queries
- */
- public void setKey(String key)
- {
- this.key = key;
- }
+ protected boolean isPollerPartition;
- /**
- * Returns the tableName which would be queried
- */
- public String getTableName()
- {
- return tableName;
- }
-
- /**
- * Sets the tableName to query
- */
- public void setTableName(String tableName)
- {
- this.tableName = tableName;
- }
-
- /**
- * Returns rangeQueryPair - <lowerBound,upperBound>
- */
- public KeyValPair<String, String> getRangeQueryPair()
- {
- return rangeQueryPair;
- }
-
- /**
- * Sets the rangeQueryPair <lowerBound,upperBound>
- */
- public void setRangeQueryPair(KeyValPair<String, String> rangeQueryPair)
- {
- this.rangeQueryPair = rangeQueryPair;
- }
-
- /**
- * Returns batchSize indicating the number of elements in emitQueue
- */
- public int getBatchSize()
- {
- return batchSize;
- }
-
- /**
- * Sets batchSize for number of elements in the emitQueue
- */
- public void setBatchSize(int batchSize)
- {
- this.batchSize = batchSize;
- }
+ protected transient MutablePair<Integer, Integer> currentWindowRecoveryState;
public AbstractJdbcPollInputOperator()
{
@@ -286,271 +142,234 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
public void setup(OperatorContext context)
{
super.setup(context);
- spinMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
+ intializeDSLContext();
+ if (scanService == null) {
+ scanService = Executors.newScheduledThreadPool(1);
+ }
execute = true;
- cause = new AtomicReference<Throwable>();
- emitQueue = new ArrayBlockingQueue<List<T>>(queueCapacity);
- this.context = context;
+ emitQueue = new LinkedBlockingDeque<>(queueCapacity);
operatorId = context.getId();
+ windowManager.setup(context);
+ }
- try {
+ private void intializeDSLContext()
+ {
+ create = DSL.using(store.getConnection(), JDBCUtils.dialect(store.getDatabaseUrl()));
+ }
+
+ @Override
+ public void activate(OperatorContext context)
+ {
+ initializePreparedStatement();
+ long largestRecoveryWindow = windowManager.getLargestRecoveryWindow();
+ if (largestRecoveryWindow == Stateless.WINDOW_ID
+ || context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) > largestRecoveryWindow) {
+ scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS);
+ }
+ }
- //If its a range query pass upper and lower bounds
- //If its a polling query pass only the lower bound
- if (getRangeQueryPair().getValue() != null) {
- ps = store.getConnection()
- .prepareStatement(
- JdbcMetaDataUtility.buildRangeQuery(getTableName(), getKey(), rangeQueryPair.getKey(),
- rangeQueryPair.getValue()),
- java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
+ protected void initializePreparedStatement()
+ {
+ try {
+ // If its a range query pass upper and lower bounds, If its a polling query pass only the lower bound
+ if (isPollerPartition) {
+ ps = store.getConnection().prepareStatement(buildRangeQuery(rangeQueryPair.getKey(), Integer.MAX_VALUE),
+ TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
} else {
ps = store.getConnection().prepareStatement(
- JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), rangeQueryPair.getKey()),
- java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
- isPollable = true;
+ buildRangeQuery(rangeQueryPair.getKey(), (rangeQueryPair.getValue() - rangeQueryPair.getKey())),
+ TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
}
-
} catch (SQLException e) {
LOG.error("Exception in initializing the range query for a given partition", e);
throw new RuntimeException(e);
}
- windowManager.setup(context);
- LOG.debug("super setup done...");
}
@Override
public void beginWindow(long windowId)
{
currentWindowId = windowId;
-
- isReplayed = false;
-
if (currentWindowId <= windowManager.getLargestRecoveryWindow()) {
try {
replay(currentWindowId);
+ return;
} catch (SQLException e) {
LOG.error("Exception in replayed windows", e);
throw new RuntimeException(e);
}
}
-
- if (isReplayed && currentWindowId == windowManager.getLargestRecoveryWindow()) {
- try {
- if (!isPollable && rangeQueryPair.getValue() != null) {
-
- ps = store.getConnection().prepareStatement(
- JdbcMetaDataUtility.buildGTRangeQuery(getTableName(), getKey(), previousUpperBound,
- rangeQueryPair.getValue()),
- java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
- } else {
- String bound = null;
- if (previousUpperBound == null) {
- bound = getRangeQueryPair().getKey();
- } else {
- bound = previousUpperBound;
- }
- ps = store.getConnection().prepareStatement(
- JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), bound),
- java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
- isPollable = true;
- }
- isReplayed = false;
- LOG.debug("Prepared statement after re-initialization - {} ", ps.toString());
- } catch (SQLException e) {
- // TODO Auto-generated catch block
- throw new RuntimeException(e);
- }
+ if (isPollerPartition) {
+ updatePollQuery();
+ isPolled = false;
}
+ lowerBound = lastEmittedRow;
+ }
- //Reset the pollable query with the updated upper and lower bounds
- if (isPollable) {
+ private void updatePollQuery()
+ {
+ if ((lastPolledRow != lastEmittedRow)) {
+ if (lastEmittedRow == null) {
+ lastPolledRow = rangeQueryPair.getKey();
+ } else {
+ lastPolledRow = lastEmittedRow;
+ }
try {
- String bound = null;
- if (previousUpperBound == null && highestPolled == null) {
- bound = getRangeQueryPair().getKey();
- } else {
- bound = highestPolled;
- }
- ps = store.getConnection().prepareStatement(
- JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), bound),
- java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
- LOG.debug("Polling query {} {}", ps.toString(), currentWindowId);
- isPolled = false;
+ ps = store.getConnection().prepareStatement(buildRangeQuery(lastPolledRow, Integer.MAX_VALUE),
+ TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
- lower = null;
- upper = null;
-
- //Check if a thread is already active and start only if its no
- //Do not start the thread from setup, will conflict with the replay
- if (dbPoller == null && !isReplayed) {
- //If this is not a replayed state, reset the ps to highest read offset + 1,
- //keep the upper bound as the one that was initialized after static partitioning
- LOG.info("Statement when re-initialized {}", ps.toString());
- dbPoller = new Thread(new DBPoller());
- dbPoller.start();
- }
}
@Override
public void emitTuples()
{
- if (isReplayed) {
+ if (currentWindowId <= windowManager.getLargestRecoveryWindow()) {
return;
}
-
- List<T> tuples;
-
- if ((tuples = emitQueue.poll()) != null) {
- for (Object tuple : tuples) {
- if (lower == null) {
- lower = tuple.toString();
- }
- upper = tuple.toString();
- outputPort.emit((T)tuple);
+ int pollSize = (emitQueue.size() < batchSize) ? emitQueue.size() : batchSize;
+ while (pollSize-- > 0) {
+ T obj = emitQueue.poll();
+ if (obj != null) {
+ emitTuple(obj);
}
+ lastEmittedRow++;
}
}
+ protected abstract void emitTuple(T tuple);
+
@Override
public void endWindow()
{
try {
if (currentWindowId > windowManager.getLargestRecoveryWindow()) {
- if (currentWindowRecoveryState == null) {
- currentWindowRecoveryState = new MutablePair<String, String>();
- }
- if (lower != null && upper != null) {
- previousUpperBound = upper;
- isPolled = true;
- }
- MutablePair<String, String> windowBoundaryPair = new MutablePair<>(lower, upper);
- currentWindowRecoveryState = windowBoundaryPair;
+ currentWindowRecoveryState = new MutablePair<>(lowerBound, lastEmittedRow);
windowManager.save(currentWindowRecoveryState, operatorId, currentWindowId);
}
} catch (IOException e) {
throw new RuntimeException("saving recovery", e);
}
- currentWindowRecoveryState = new MutablePair<>();
- }
-
- public int getPartitionCount()
- {
- return partitionCount;
+ if (threadException != null) {
+ store.disconnect();
+ DTThrowable.rethrow(threadException.get());
+ }
}
- public void setPartitionCount(int partitionCount)
+ @Override
+ public void deactivate()
{
- this.partitionCount = partitionCount;
+ scanService.shutdownNow();
+ store.disconnect();
}
- @Override
- public void activate(Context cntxt)
+ protected void pollRecords()
{
- if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID
- && context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowManager.getLargestRecoveryWindow()) {
- // If it is a replay state, don't start any threads here
+ if (isPolled) {
return;
}
- }
-
- @Override
- public void deactivate()
- {
try {
- if (dbPoller != null && dbPoller.isAlive()) {
- dbPoller.interrupt();
- dbPoller.join();
+ ps.setFetchSize(getFetchSize());
+ ResultSet result = ps.executeQuery();
+ if (result.next()) {
+ do {
+ while (!emitQueue.offer(getTuple(result))) {
+ Thread.sleep(DEFAULT_SLEEP_TIME);
+ }
+ } while (result.next());
}
- } catch (InterruptedException ex) {
- // log and ignore, ending execution anyway
- LOG.error("exception in poller thread: ", ex);
- }
- }
-
- @Override
- public void handleIdleTime()
- {
- if (execute) {
- try {
- Thread.sleep(spinMillis);
- } catch (InterruptedException ie) {
- throw new RuntimeException(ie);
+ isPolled = true;
+ } catch (SQLException ex) {
+ execute = false;
+ threadException = new AtomicReference<Throwable>(ex);
+ } catch (InterruptedException e) {
+ threadException = new AtomicReference<Throwable>(e);
+ } finally {
+ if (!isPollerPartition) {
+ store.disconnect();
}
- } else {
- LOG.error("Exception: ", cause);
- DTThrowable.rethrow(cause.get());
}
}
+ public abstract T getTuple(ResultSet result);
+
protected void replay(long windowId) throws SQLException
{
- isReplayed = true;
- MutablePair<String, String> recoveredData = new MutablePair<String, String>();
try {
- recoveredData = (MutablePair<String, String>)windowManager.load(operatorId, windowId);
+ MutablePair<Integer, Integer> recoveredData = (MutablePair<Integer, Integer>)windowManager.load(operatorId,
+ windowId);
- if (recoveredData != null) {
- //skip the window and return if there was no incoming data in the window
- if (recoveredData.left == null || recoveredData.right == null) {
- return;
- }
+ if (recoveredData != null && shouldReplayWindow(recoveredData)) {
+ LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", windowId, recoveredData.left,
+ recoveredData.right);
- if (recoveredData.right.equals(rangeQueryPair.getValue()) || recoveredData.right.equals(previousUpperBound)) {
- LOG.info("Matched so returning");
- return;
- }
-
- JdbcPollInputOperator jdbcPoller = new JdbcPollInputOperator();
- jdbcPoller.setStore(store);
- jdbcPoller.setKey(getKey());
- jdbcPoller.setPartitionCount(getPartitionCount());
- jdbcPoller.setPollInterval(getPollInterval());
- jdbcPoller.setTableName(getTableName());
- jdbcPoller.setBatchSize(getBatchSize());
- isPollable = false;
-
- LOG.debug("[Window ID -" + windowId + "," + recoveredData.left + "," + recoveredData.right + "]");
+ ps = store.getConnection().prepareStatement(
+ buildRangeQuery(recoveredData.left, (recoveredData.right - recoveredData.left)), TYPE_FORWARD_ONLY,
+ CONCUR_READ_ONLY);
+ LOG.info("Query formed to recover data - {}", ps.toString());
- jdbcPoller.setRangeQueryPair(new KeyValPair<String, String>(recoveredData.left, recoveredData.right));
+ emitReplayedTuples(ps);
- jdbcPoller.ps = jdbcPoller.store.getConnection().prepareStatement(
- JdbcMetaDataUtility.buildRangeQuery(jdbcPoller.getTableName(), jdbcPoller.getKey(),
- jdbcPoller.getRangeQueryPair().getKey(), jdbcPoller.getRangeQueryPair().getValue()),
- java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
- LOG.info("Query formed for recovered data - {}", jdbcPoller.ps.toString());
+ }
- emitReplayedTuples(jdbcPoller.ps);
+ if (currentWindowId == windowManager.getLargestRecoveryWindow()) {
+ try {
+ if (!isPollerPartition && rangeQueryPair.getValue() != null) {
+ ps = store.getConnection().prepareStatement(
+ buildRangeQuery(lastEmittedRow, (rangeQueryPair.getValue() - lastEmittedRow)), TYPE_FORWARD_ONLY,
+ CONCUR_READ_ONLY);
+ } else {
+ Integer bound = null;
+ if (lastEmittedRow == null) {
+ bound = rangeQueryPair.getKey();
+ } else {
+ bound = lastEmittedRow;
+ }
+ ps = store.getConnection().prepareStatement(buildRangeQuery(bound, Integer.MAX_VALUE), TYPE_FORWARD_ONLY,
+ CONCUR_READ_ONLY);
+ }
+ scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
}
} catch (IOException e) {
- throw new RuntimeException("replay", e);
+ throw new RuntimeException("Exception during replay of records.", e);
}
}
+ private boolean shouldReplayWindow(MutablePair<Integer, Integer> recoveredData)
+ {
+ if (recoveredData.left == null || recoveredData.right == null) {
+ return false;
+ }
+ if (recoveredData.right.equals(rangeQueryPair.getValue()) || recoveredData.right.equals(lastEmittedRow)) {
+ return false;
+ }
+ return true;
+ }
+
/**
* Replays the tuples in sync mode for replayed windows
*/
public void emitReplayedTuples(PreparedStatement ps)
{
- LOG.debug("Emitting replayed statement is -" + ps.toString());
ResultSet rs = null;
try (PreparedStatement pStat = ps;) {
pStat.setFetchSize(getFetchSize());
- LOG.debug("sql query = {}", pStat);
rs = pStat.executeQuery();
if (rs == null || rs.isClosed()) {
- LOG.debug("Nothing to replay");
return;
}
while (rs.next()) {
- previousUpperBound = rs.getObject(getKey()).toString();
- outputPort.emit((T)rs.getObject(getKey()));
+ emitTuple(getTuple(rs));
+ lastEmittedRow++;
}
} catch (SQLException ex) {
throw new RuntimeException(ex);
@@ -565,59 +384,42 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
*/
@Override
public Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>> definePartitions(
- Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>> partitions,
- com.datatorrent.api.Partitioner.PartitioningContext context)
+ Collection<Partition<AbstractJdbcPollInputOperator<T>>> partitions, PartitioningContext context)
{
List<Partition<AbstractJdbcPollInputOperator<T>>> newPartitions = new ArrayList<Partition<AbstractJdbcPollInputOperator<T>>>(
getPartitionCount());
- JdbcStore jdbcStore = new JdbcStore();
- jdbcStore.setDatabaseDriver(store.getDatabaseDriver());
- jdbcStore.setDatabaseUrl(store.getDatabaseUrl());
- jdbcStore.setConnectionProperties(store.getConnectionProperties());
-
- jdbcStore.connect();
- HashMap<Integer, KeyValPair<String, String>> partitionToRangeMap = null;
+ HashMap<Integer, KeyValPair<Integer, Integer>> partitionToRangeMap = null;
try {
- partitionToRangeMap = JdbcMetaDataUtility.getPartitionedQueryMap(getPartitionCount(),
- jdbcStore.getDatabaseDriver(), jdbcStore.getDatabaseUrl(), getTableName(), getKey(),
- store.getConnectionProperties().getProperty(user), store.getConnectionProperties().getProperty(password),
- whereCondition, emitColumnList);
+ store.connect();
+ intializeDSLContext();
+ partitionToRangeMap = getPartitionedQueryRangeMap(getPartitionCount());
} catch (SQLException e) {
LOG.error("Exception in initializing the partition range", e);
+ throw new RuntimeException(e);
+ } finally {
+ store.disconnect();
}
KryoCloneUtils<AbstractJdbcPollInputOperator<T>> cloneUtils = KryoCloneUtils.createCloneUtils(this);
+ // The n given partitions are for range queries and n + 1 partition is for polling query
for (int i = 0; i <= getPartitionCount(); i++) {
- AbstractJdbcPollInputOperator<T> jdbcPoller = null;
-
- jdbcPoller = cloneUtils.getClone();
-
- jdbcPoller.setStore(store);
- jdbcPoller.setKey(getKey());
- jdbcPoller.setPartitionCount(getPartitionCount());
- jdbcPoller.setPollInterval(getPollInterval());
- jdbcPoller.setTableName(getTableName());
- jdbcPoller.setBatchSize(getBatchSize());
- jdbcPoller.setEmitColumnList(getEmitColumnList());
-
- store.connect();
- //The n given partitions are for range queries and n + 1 partition is for polling query
- //The upper bound for the n+1 partition is set to null since its a pollable partition
+ AbstractJdbcPollInputOperator<T> jdbcPoller = cloneUtils.getClone();
if (i < getPartitionCount()) {
- jdbcPoller.setRangeQueryPair(partitionToRangeMap.get(i));
- isPollable = false;
+ jdbcPoller.rangeQueryPair = partitionToRangeMap.get(i);
+ jdbcPoller.lastEmittedRow = partitionToRangeMap.get(i).getKey();
+ jdbcPoller.isPollerPartition = false;
} else {
- jdbcPoller.setRangeQueryPair(new KeyValPair<String, String>(partitionToRangeMap.get(i - 1).getValue(), null));
- isPollable = true;
+ // The upper bound for the n+1 partition is set to null since its a pollable partition
+ int partitionKey = partitionToRangeMap.get(i - 1).getValue();
+ jdbcPoller.rangeQueryPair = new KeyValPair<Integer, Integer>(partitionKey, null);
+ jdbcPoller.lastEmittedRow = partitionKey;
+ jdbcPoller.isPollerPartition = true;
}
- Partition<AbstractJdbcPollInputOperator<T>> po = new DefaultPartition<AbstractJdbcPollInputOperator<T>>(
- jdbcPoller);
- newPartitions.add(po);
+ newPartitions.add(new DefaultPartition<AbstractJdbcPollInputOperator<T>>(jdbcPoller));
}
- previousUpperBound = null;
return newPartitions;
}
@@ -625,7 +427,69 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
public void partitioned(
Map<Integer, com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>> partitions)
{
- //Nothing to implement here
+ // Nothing to implement here
+ }
+
+ private HashMap<Integer, KeyValPair<Integer, Integer>> getPartitionedQueryRangeMap(int partitions)
+ throws SQLException
+ {
+ int rowCount = 0;
+ try {
+ rowCount = getRecordsCount();
+ } catch (SQLException e) {
+ LOG.error("Exception in getting the record range", e);
+ }
+
+ HashMap<Integer, KeyValPair<Integer, Integer>> partitionToQueryMap = new HashMap<>();
+ int events = (rowCount / partitions);
+ for (int i = 0, lowerOffset = 0, upperOffset = events; i < partitions - 1; i++, lowerOffset += events, upperOffset += events) {
+ partitionToQueryMap.put(i, new KeyValPair<Integer, Integer>(lowerOffset, upperOffset));
+ }
+
+ partitionToQueryMap.put(partitions - 1, new KeyValPair<Integer, Integer>(events * (partitions - 1), (int)rowCount));
+ LOG.info("Partition map - " + partitionToQueryMap.toString());
+ return partitionToQueryMap;
+ }
+
+ /**
+ * Finds the total number of rows in the table
+ *
+ * @return number of records in table
+ */
+ private int getRecordsCount() throws SQLException
+ {
+ Condition condition = DSL.trueCondition();
+ if (getWhereCondition() != null) {
+ condition = condition.and(getWhereCondition());
+ }
+ int recordsCount = create.select(DSL.count()).from(getTableName()).where(condition).fetchOne(0, int.class);
+ return recordsCount;
+ }
+
+ /**
+ * Helper function returns a range query based on the bounds passed<br>
+ */
+ protected String buildRangeQuery(int offset, int limit)
+ {
+ Condition condition = DSL.trueCondition();
+ if (getWhereCondition() != null) {
+ condition = condition.and(getWhereCondition());
+ }
+
+ String sqlQuery;
+ if (getColumnsExpression() != null) {
+ Collection<Field<?>> columns = new ArrayList<>();
+ for (String column : getColumnsExpression().split(",")) {
+ columns.add(field(column));
+ }
+ sqlQuery = create.select((Collection<? extends SelectField<?>>)columns).from(getTableName()).where(condition)
+ .orderBy(field(getKey())).limit(limit).offset(offset).getSQL(ParamType.INLINED);
+ } else {
+ sqlQuery = create.select().from(getTableName()).where(condition).orderBy(field(getKey())).limit(limit)
+ .offset(offset).getSQL(ParamType.INLINED);
+ }
+ LOG.info("DSL Query: " + sqlQuery);
+ return sqlQuery;
}
/**
@@ -638,24 +502,219 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu
public void run()
{
while (execute) {
- try {
- long startTs = System.currentTimeMillis();
- if ((isPollable && !isPolled) || !isPollable) {
- pollRecords(ps);
- }
- long endTs = System.currentTimeMillis();
- long ioTime = endTs - startTs;
- long sleepTime = pollInterval - ioTime;
- LOG.debug("pollInterval = {} , I/O time = {} , sleepTime = {}", pollInterval, ioTime, sleepTime);
- Thread.sleep(sleepTime > 0 ? sleepTime : 0);
- } catch (Exception ex) {
- cause.set(ex);
- execute = false;
+ if ((isPollerPartition && !isPolled) || !isPollerPartition) {
+ pollRecords();
}
}
}
}
- private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(AbstractJdbcPollInputOperator.class);
+ @VisibleForTesting
+ protected void setScheduledExecutorService(ScheduledExecutorService service)
+ {
+ scanService = service;
+ }
+
+ /**
+ * Gets {@link WindowDataManager}
+ *
+ * @return windowDatatManager
+ */
+ public WindowDataManager getWindowManager()
+ {
+ return windowManager;
+ }
+
+ /**
+ * Sets {@link WindowDataManager}
+ *
+ * @param windowDataManager
+ */
+ public void setWindowManager(WindowDataManager windowDataManager)
+ {
+ this.windowManager = windowDataManager;
+ }
+
+ /**
+ * Gets non-polling static partitions count
+ *
+ * @return partitionCount
+ */
+ public int getPartitionCount()
+ {
+ return partitionCount;
+ }
+
+ /**
+ * Sets non-polling static partitions count
+ *
+ * @param partitionCount
+ */
+ public void setPartitionCount(int partitionCount)
+ {
+ this.partitionCount = partitionCount;
+ }
+
+ /**
+ * Returns the where clause
+ *
+ * @return whereCondition
+ */
+ public String getWhereCondition()
+ {
+ return whereCondition;
+ }
+
+ /**
+ * Sets the where clause
+ *
+ * @param whereCondition
+ */
+ public void setWhereCondition(String whereCondition)
+ {
+ this.whereCondition = whereCondition;
+ }
+
+ /**
+ * Returns the list of columns to select from the table
+ *
+ * @return columnsExpression
+ */
+ public String getColumnsExpression()
+ {
+ return columnsExpression;
+ }
+
+ /**
+ * Comma separated list of columns to select from the given table
+ *
+ * @param columnsExpression
+ */
+ public void setColumnsExpression(String columnsExpression)
+ {
+ this.columnsExpression = columnsExpression;
+ }
+
+ /**
+ * Returns the fetchsize for getting the results
+ *
+ * @return fetchSize
+ */
+ public int getFetchSize()
+ {
+ return fetchSize;
+ }
+
+ /**
+ * Sets the fetchsize for getting the results
+ *
+ * @param fetchSize
+ */
+ public void setFetchSize(int fetchSize)
+ {
+ this.fetchSize = fetchSize;
+ }
+
+ /**
+ * Returns the interval for polling the DB
+ *
+ * @return pollInterval
+ */
+ public int getPollInterval()
+ {
+ return pollInterval;
+ }
+
+ /**
+ * Sets the interval for polling the DB
+ *
+ * @param pollInterval
+ */
+ public void setPollInterval(int pollInterval)
+ {
+ this.pollInterval = pollInterval;
+ }
+
+ /**
+ * Returns the capacity of the emit queue
+ *
+ * @return queueCapacity
+ */
+ public int getQueueCapacity()
+ {
+ return queueCapacity;
+ }
+
+ /**
+ * Sets the capacity of the emit queue
+ *
+ * @param queueCapacity
+ */
+ public void setQueueCapacity(int queueCapacity)
+ {
+ this.queueCapacity = queueCapacity;
+ }
+
+ /**
+ * Returns the tableName which would be queried
+ *
+ * @return tableName
+ */
+ public String getTableName()
+ {
+ return tableName;
+ }
+
+ /**
+ * Sets the tableName to query
+ *
+ * @param tableName
+ */
+ public void setTableName(String tableName)
+ {
+ this.tableName = tableName;
+ }
+
+ /**
+ * Returns batchSize indicating the number of elements to emit in a bacth
+ *
+ * @return batchSize
+ */
+ public int getBatchSize()
+ {
+ return batchSize;
+ }
+
+ /**
+ * Sets batchSize for number of elements to emit in a bacth
+ *
+ * @param batchSize
+ */
+ public void setBatchSize(int batchSize)
+ {
+ this.batchSize = batchSize;
+ }
+
+ /**
+ * Sets primary key column name
+ *
+ * @return key
+ */
+ public String getKey()
+ {
+ return key;
+ }
+
+ /**
+ * gets primary key column name
+ *
+ * @param key
+ */
+ public void setKey(String key)
+ {
+ this.key = key;
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcPollInputOperator.class);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/26fa9d78/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java
deleted file mode 100644
index 9ba353c..0000000
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java
+++ /dev/null
@@ -1,353 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.db.jdbc;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.HashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.classification.InterfaceStability.Evolving;
-
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- * A utility class used to retrieve the metadata for a given unique key of a SQL
- * table. This class would emit range queries based on a primary index given
- *
- * LIMIT clause may not work with all databases or may not return the same
- * results always<br>
- *
- * This utility has been tested with MySQL and where clause is supported<br>
- *
- * @Input - dbName,tableName, primaryKey
- * @Output - map<operatorId,prepared statement>
- *
- */
-@Evolving
-public class JdbcMetaDataUtility
-{
- private static String DB_DRIVER = "com.mysql.jdbc.Driver";
- private static String DB_CONNECTION = "";
- private static String DB_USER = "";
- private static String DB_PASSWORD = "";
- private static String TABLE_NAME = "";
- private static String KEY_COLUMN = "";
- private static String WHERE_CLAUSE = null;
- private static String COLUMN_LIST = null;
-
- private static Logger LOG = LoggerFactory.getLogger(JdbcMetaDataUtility.class);
-
- public JdbcMetaDataUtility()
- {
-
- }
-
- public JdbcMetaDataUtility(String dbConnection, String tableName, String key, String userName, String password)
- {
- DB_CONNECTION = dbConnection;
- DB_USER = userName;
- DB_PASSWORD = password;
- TABLE_NAME = tableName;
- KEY_COLUMN = key;
- }
-
- /**
- * Returns the database connection handle
- * */
- private static Connection getDBConnection()
- {
-
- Connection dbConnection = null;
-
- try {
- Class.forName(DB_DRIVER);
- } catch (ClassNotFoundException e) {
- LOG.error("Driver not found", e);
- throw new RuntimeException(e);
- }
-
- try {
- dbConnection = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
- return dbConnection;
- } catch (SQLException e) {
- LOG.error("Exception in getting connection handle", e);
- throw new RuntimeException(e);
- }
-
- }
-
- private static String generateQueryString()
- {
- StringBuilder sb = new StringBuilder();
- sb.append("SELECT COUNT(*) as RowCount from " + TABLE_NAME);
-
- if (WHERE_CLAUSE != null) {
- sb.append(" WHERE " + WHERE_CLAUSE);
- }
-
- return sb.toString();
- }
-
- /**
- * Finds the total number of rows in the table
- */
- private static long getRecordRange(String query) throws SQLException
- {
- long rowCount = 0;
- Connection dbConnection = null;
- PreparedStatement preparedStatement = null;
-
- try {
- dbConnection = getDBConnection();
- preparedStatement = dbConnection.prepareStatement(query);
-
- ResultSet rs = preparedStatement.executeQuery();
-
- while (rs.next()) {
- rowCount = Long.parseLong(rs.getString(1));
- LOG.debug("# Rows - " + rowCount);
- }
-
- } catch (SQLException e) {
- LOG.error("Exception in retreiving result set", e);
- throw new RuntimeException(e);
- } finally {
- if (preparedStatement != null) {
- preparedStatement.close();
- }
- if (dbConnection != null) {
- dbConnection.close();
- }
- }
- return rowCount;
- }
-
- /**
- * Returns a pair of <upper,lower> bounds for each partition of the
- * {@link JdbcPollInputOperator}}
- */
- private static KeyValPair<String, String> getQueryBounds(long lower, long upper) throws SQLException
- {
- Connection dbConnection = null;
- PreparedStatement psLowerBound = null;
- PreparedStatement psUpperBound = null;
-
- StringBuilder lowerBound = new StringBuilder();
- StringBuilder upperBound = new StringBuilder();
-
- KeyValPair<String, String> boundedQUeryPair = null;
-
- try {
- dbConnection = getDBConnection();
-
- /*
- * Run this loop only for n-1 partitions.
- * By default the last partition will have fewer records, since we are rounding off
- * */
-
- lowerBound.append("SELECT " + KEY_COLUMN + " FROM " + TABLE_NAME);
- upperBound.append("SELECT " + KEY_COLUMN + " FROM " + TABLE_NAME);
-
- if (WHERE_CLAUSE != null) {
- lowerBound.append(" WHERE " + WHERE_CLAUSE);
- upperBound.append(" WHERE " + WHERE_CLAUSE);
- }
-
- lowerBound.append(" LIMIT " + (0 + lower) + ",1");
- upperBound.append(" LIMIT " + (upper - 1) + ",1");
-
- psLowerBound = dbConnection.prepareStatement(lowerBound.toString());
- psUpperBound = dbConnection.prepareStatement(upperBound.toString());
-
- ResultSet rsLower = psLowerBound.executeQuery();
- ResultSet rsUpper = psUpperBound.executeQuery();
-
- String lowerVal = null;
- String upperVal = null;
-
- while (rsLower.next()) {
- lowerVal = rsLower.getString(KEY_COLUMN);
- }
-
- while (rsUpper.next()) {
- upperVal = rsUpper.getString(KEY_COLUMN);
- }
-
- boundedQUeryPair = new KeyValPair<String, String>(lowerVal, upperVal);
-
- } catch (SQLException e) {
- LOG.error("Exception in getting bounds for queries");
- throw new RuntimeException(e);
- } finally {
- if (psLowerBound != null) {
- psLowerBound.close();
- }
- if (psUpperBound != null) {
- psUpperBound.close();
- }
- if (dbConnection != null) {
- dbConnection.close();
- }
- }
- return boundedQUeryPair;
-
- }
-
- /**
- * Returns a map of partitionId to a KeyValPair of <lower,upper> of the query
- * range<br>
- * Ensures even distribution of records across partitions except the last
- * partition By default the last partition for batch queries will have fewer
- * records, since we are rounding off
- *
- * @throws SQLException
- *
- */
- private static HashMap<Integer, KeyValPair<String, String>> getRangeQueries(int numberOfPartitions, int events,
- long rowCount) throws SQLException
- {
- HashMap<Integer, KeyValPair<String, String>> partitionToQueryMap = new HashMap<Integer, KeyValPair<String, String>>();
- for (int i = 0, lowerOffset = 0, upperOffset = events; i < numberOfPartitions
- - 1; i++, lowerOffset += events, upperOffset += events) {
-
- partitionToQueryMap.put(i, getQueryBounds(lowerOffset, upperOffset));
- }
-
- //Call to construct the lower and upper bounds for the last partition
- partitionToQueryMap.put(numberOfPartitions - 1, getQueryBounds(events * (numberOfPartitions - 1), rowCount));
-
- LOG.info("Partition map - " + partitionToQueryMap.toString());
-
- return partitionToQueryMap;
- }
-
- /**
- * Helper function returns a range query based on the bounds passed<br>
- * Invoked from the setup method of {@link - AbstractJdbcPollInputOperator} to
- * initialize the preparedStatement in the given operator<br>
- * Optional whereClause for conditional selections Optional columnList for
- * projection
- */
- public static String buildRangeQuery(String tableName, String keyColumn, String lower, String upper)
- {
-
- StringBuilder sb = new StringBuilder();
- sb.append("SELECT ");
- if (COLUMN_LIST != null) {
- sb.append(COLUMN_LIST);
- } else {
- sb.append("*");
- }
- sb.append(" from " + tableName + " WHERE ");
- if (WHERE_CLAUSE != null) {
- sb.append(WHERE_CLAUSE + " AND ");
- }
- sb.append(keyColumn + " BETWEEN '" + lower + "' AND '" + upper + "'");
- return sb.toString();
- }
-
- /**
- * Helper function that constructs a query from the next highest key after an
- * operator is restarted
- */
- public static String buildGTRangeQuery(String tableName, String keyColumn, String lower, String upper)
- {
- StringBuilder sb = new StringBuilder();
- sb.append("SELECT ");
- if (COLUMN_LIST != null) {
- sb.append(COLUMN_LIST);
- } else {
- sb.append("*");
- }
- sb.append(" from " + tableName + " WHERE ");
- if (WHERE_CLAUSE != null) {
- sb.append(WHERE_CLAUSE + " AND ");
- }
- sb.append(keyColumn + " > '" + lower + "' AND " + keyColumn + " <= '" + upper + "'");
- return sb.toString();
- }
-
- /**
- * Helper function that constructs a query for polling from outside the given
- * range
- */
- public static String buildPollableQuery(String tableName, String keyColumn, String lower)
- {
- StringBuilder sb = new StringBuilder();
- sb.append("SELECT ");
- if (COLUMN_LIST != null) {
- sb.append(COLUMN_LIST);
- } else {
- sb.append("*");
- }
- sb.append(" from " + tableName + " WHERE ");
- if (WHERE_CLAUSE != null) {
- sb.append(WHERE_CLAUSE + " AND ");
- }
- sb.append(keyColumn + " > '" + lower + "' ");
- return sb.toString();
- }
-
- /**
- * Called by the partitioner from {@link - AbstractJdbcPollInputOperator}<br>
- * Finds the range of query per partition<br>
- * Returns a map of partitionId to PreparedStatement based on the range
- * computed
- *
- * @throws SQLException
- */
- public static HashMap<Integer, KeyValPair<String, String>> getPartitionedQueryMap(int partitions, String dbDriver,
- String dbConnection, String tableName, String key, String userName, String password, String whereClause,
- String columnList) throws SQLException
- {
- long rowCount = 0L;
- try {
- DB_CONNECTION = dbConnection;
- DB_USER = userName;
- DB_PASSWORD = password;
- TABLE_NAME = tableName;
- KEY_COLUMN = key;
- WHERE_CLAUSE = whereClause;
- COLUMN_LIST = columnList;
- DB_DRIVER = dbDriver;
- rowCount = getRecordRange(generateQueryString());
- } catch (SQLException e) {
- LOG.error("Exception in getting the record range", e);
- }
- return getRangeQueries(partitions, getOffset(rowCount, partitions), rowCount);
- }
-
- /**
- * Returns the rounded offset to arrive at a range query
- */
- private static int getOffset(long rowCount, int partitions)
- {
- if (rowCount % partitions == 0) {
- return (int)(rowCount / partitions);
- } else {
- return (int)((rowCount - (rowCount % (partitions - 1))) / (partitions - 1));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/26fa9d78/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
new file mode 100644
index 0000000..91821be
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.db.jdbc.JdbcPOJOInputOperator.ActiveFieldInfo;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * A concrete implementation for {@link AbstractJdbcPollInputOperator} to
+ * consume data from jdbc store and emit POJO for each record <br>
+ *
+ * @displayName Jdbc Polling Input Operator
+ * @category Input
+ * @tags database, sql, jdbc
+ */
+@Evolving
+public class JdbcPOJOPollInputOperator extends AbstractJdbcPollInputOperator<Object>
+{
+ private final transient List<ActiveFieldInfo> columnFieldSetters = Lists.newArrayList();
+ protected List<Integer> columnDataTypes;
+ protected transient Class<?> pojoClass;
+ @NotNull
+ private List<FieldInfo> fieldInfos;
+
+ @OutputPortFieldAnnotation(schemaRequired = true)
+ public final transient DefaultOutputPort<Object> outputPort = new DefaultOutputPort<Object>()
+ {
+ @Override
+ public void setup(Context.PortContext context)
+ {
+ pojoClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
+ };
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ super.setup(context);
+ try {
+ // closing the query statement in super class as it is not needed
+ if (getColumnsExpression() == null) {
+ StringBuilder columns = new StringBuilder();
+ for (int i = 0; i < fieldInfos.size(); i++) {
+ columns.append(fieldInfos.get(i).getColumnName());
+ if (i < fieldInfos.size() - 1) {
+ columns.append(",");
+ }
+ }
+ setColumnsExpression(columns.toString());
+ LOG.debug("select expr {}", columns.toString());
+ }
+
+ if (columnDataTypes == null) {
+ populateColumnDataTypes();
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+
+ for (FieldInfo fi : fieldInfos) {
+ columnFieldSetters.add(new ActiveFieldInfo(fi));
+ }
+ }
+
+ @Override
+ public void activate(Context.OperatorContext context)
+ {
+ for (int i = 0; i < columnDataTypes.size(); i++) {
+ final int type = columnDataTypes.get(i);
+ JdbcPOJOInputOperator.ActiveFieldInfo activeFieldInfo = columnFieldSetters.get(i);
+ switch (type) {
+ case (Types.CHAR):
+ case (Types.VARCHAR):
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression(), String.class);
+ break;
+
+ case (Types.BOOLEAN):
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterBoolean(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ case (Types.TINYINT):
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterByte(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ case (Types.SMALLINT):
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterShort(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ case (Types.INTEGER):
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterInt(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ case (Types.BIGINT):
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ case (Types.FLOAT):
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterFloat(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ case (Types.DOUBLE):
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterDouble(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ case Types.DECIMAL:
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression(), BigDecimal.class);
+ break;
+
+ case Types.TIMESTAMP:
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression(), Timestamp.class);
+ break;
+
+ case Types.TIME:
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression(), Time.class);
+ break;
+
+ case Types.DATE:
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression(), Date.class);
+ break;
+
+ default:
+ throw new RuntimeException("unsupported data type " + type);
+ }
+ }
+ super.activate(context);
+ }
+
+ protected void populateColumnDataTypes() throws SQLException
+ {
+ columnDataTypes = Lists.newArrayList();
+ PreparedStatement stmt = store.getConnection().prepareStatement(buildRangeQuery(1, 1));
+ try (ResultSet rs = stmt.executeQuery()) {
+ Map<String, Integer> nameToType = Maps.newHashMap();
+ ResultSetMetaData rsMetaData = rs.getMetaData();
+ LOG.debug("resultSet MetaData column count {}", rsMetaData.getColumnCount());
+
+ for (int i = 1; i <= rsMetaData.getColumnCount(); i++) {
+ int type = rsMetaData.getColumnType(i);
+ String name = rsMetaData.getColumnName(i);
+ LOG.debug("column name {} type {}", name, type);
+ nameToType.put(name, type);
+ }
+
+ for (FieldInfo fieldInfo : fieldInfos) {
+ columnDataTypes.add(nameToType.get(fieldInfo.getColumnName()));
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Object getTuple(ResultSet result)
+ {
+ Object obj;
+ try {
+ obj = pojoClass.newInstance();
+ } catch (InstantiationException | IllegalAccessException ex) {
+ store.disconnect();
+ throw new RuntimeException(ex);
+ }
+
+ try {
+ for (int i = 0; i < fieldInfos.size(); i++) {
+ int type = columnDataTypes.get(i);
+ ActiveFieldInfo afi = columnFieldSetters.get(i);
+ switch (type) {
+ case Types.CHAR:
+ case Types.VARCHAR:
+ String strVal = result.getString(i + 1);
+ ((PojoUtils.Setter<Object, String>)afi.setterOrGetter).set(obj, strVal);
+ break;
+
+ case Types.BOOLEAN:
+ boolean boolVal = result.getBoolean(i + 1);
+ ((PojoUtils.SetterBoolean<Object>)afi.setterOrGetter).set(obj, boolVal);
+ break;
+
+ case Types.TINYINT:
+ byte byteVal = result.getByte(i + 1);
+ ((PojoUtils.SetterByte<Object>)afi.setterOrGetter).set(obj, byteVal);
+ break;
+
+ case Types.SMALLINT:
+ short shortVal = result.getShort(i + 1);
+ ((PojoUtils.SetterShort<Object>)afi.setterOrGetter).set(obj, shortVal);
+ break;
+
+ case Types.INTEGER:
+ int intVal = result.getInt(i + 1);
+ ((PojoUtils.SetterInt<Object>)afi.setterOrGetter).set(obj, intVal);
+ break;
+
+ case Types.BIGINT:
+ long longVal = result.getLong(i + 1);
+ ((PojoUtils.SetterLong<Object>)afi.setterOrGetter).set(obj, longVal);
+ break;
+
+ case Types.FLOAT:
+ float floatVal = result.getFloat(i + 1);
+ ((PojoUtils.SetterFloat<Object>)afi.setterOrGetter).set(obj, floatVal);
+ break;
+
+ case Types.DOUBLE:
+ double doubleVal = result.getDouble(i + 1);
+ ((PojoUtils.SetterDouble<Object>)afi.setterOrGetter).set(obj, doubleVal);
+ break;
+
+ case Types.DECIMAL:
+ BigDecimal bdVal = result.getBigDecimal(i + 1);
+ ((PojoUtils.Setter<Object, BigDecimal>)afi.setterOrGetter).set(obj, bdVal);
+ break;
+
+ case Types.TIMESTAMP:
+ Timestamp tsVal = result.getTimestamp(i + 1);
+ ((PojoUtils.Setter<Object, Timestamp>)afi.setterOrGetter).set(obj, tsVal);
+ break;
+
+ case Types.TIME:
+ Time timeVal = result.getTime(i + 1);
+ ((PojoUtils.Setter<Object, Time>)afi.setterOrGetter).set(obj, timeVal);
+ break;
+
+ case Types.DATE:
+ Date dateVal = result.getDate(i + 1);
+ ((PojoUtils.Setter<Object, Date>)afi.setterOrGetter).set(obj, dateVal);
+ break;
+
+ default:
+ throw new RuntimeException("unsupported data type " + type);
+ }
+ }
+ return obj;
+ } catch (SQLException e) {
+ store.disconnect();
+ throw new RuntimeException("fetching metadata", e);
+ }
+ }
+
+ @Override
+ protected void emitTuple(Object obj)
+ {
+ outputPort.emit(obj);
+ }
+
+ /**
+ * A list of {@link FieldInfo}s where each item maps a column name to a pojo
+ * field name.
+ */
+ public List<FieldInfo> getFieldInfos()
+ {
+ return fieldInfos;
+ }
+
+ /**
+ * Sets the {@link FieldInfo}s. A {@link FieldInfo} maps a store column to a
+ * pojo field name.<br/>
+ * The value from fieldInfo.column is assigned to
+ * fieldInfo.pojoFieldExpression.
+ *
+ * @description $[].columnName name of the database column name
+ * @description $[].pojoFieldExpression pojo field name or expression
+ * @useSchema $[].pojoFieldExpression outputPort.fields[].name
+ */
+ public void setFieldInfos(List<FieldInfo> fieldInfos)
+ {
+ this.fieldInfos = fieldInfos;
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(JdbcPOJOPollInputOperator.class);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/26fa9d78/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java
index 45f96bf..518ac17 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java
@@ -18,214 +18,66 @@
*/
package com.datatorrent.lib.db.jdbc;
-import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import com.google.common.collect.Lists;
import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
/**
- * A concrete implementation for {@link AbstractJdbcPollInputOperator}} for
- * consuming data from MySQL using JDBC interface <br>
- * User needs to provide tableName,dbConnection,setEmitColumnList,look-up key
- * <br>
- * Optionally batchSize,pollInterval,Look-up key and a where clause can be given
- * <br>
- * This operator uses static partitioning to arrive at range queries for exactly
- * once reads<br>
- * Assumption is that there is an ordered column using which range queries can
- * be formed<br>
- * If an emitColumnList is provided, please ensure that the keyColumn is the
- * first column in the list<br>
- * Range queries are formed using the {@link JdbcMetaDataUtility}} Output -
- * comma separated list of the emit columns eg columnA,columnB,columnC
+ * A concrete implementation for {@link AbstractJdbcPollInputOperator} to
+ * consume data from jdbc store and emit comma separated values <br>
*
* @displayName Jdbc Polling Input Operator
* @category Input
* @tags database, sql, jdbc
*/
@Evolving
-@OperatorAnnotation(checkpointableWithinAppWindow = false)
-public class JdbcPollInputOperator extends AbstractJdbcPollInputOperator<Object>
+public class JdbcPollInputOperator extends AbstractJdbcPollInputOperator<String>
{
- private long lastBatchWindowId;
- private transient long currentWindowId;
- private long lastCreationTsMillis;
- private long fetchBackMillis = 0L;
+ @OutputPortFieldAnnotation
+ public final transient DefaultOutputPort<String> outputPort = new DefaultOutputPort<>();
private ArrayList<String> emitColumns;
- private transient int count = 0;
-
- /**
- * Returns the emit columns
- */
- public List<String> getEmitColumns()
- {
- return emitColumns;
- }
-
- /**
- * Sets the emit columns
- */
- public void setEmitColumns(ArrayList<String> emitColumns)
- {
- this.emitColumns = emitColumns;
- }
-
- /**
- * Returns fetchBackMilis
- */
- public long getFetchBackMillis()
- {
- return fetchBackMillis;
- }
-
- /**
- * Sets fetchBackMilis - used in polling
- */
- public void setFetchBackMillis(long fetchBackMillis)
- {
- this.fetchBackMillis = fetchBackMillis;
- }
@Override
public void setup(OperatorContext context)
{
super.setup(context);
- parseEmitColumnList(getEmitColumnList());
- lastCreationTsMillis = System.currentTimeMillis() - fetchBackMillis;
+ parseEmitColumnList();
}
- private void parseEmitColumnList(String columnList)
+ private void parseEmitColumnList()
{
- String[] cols = columnList.split(",");
- ArrayList<String> arr = Lists.newArrayList();
+ String[] cols = getColumnsExpression().split(",");
+ emitColumns = Lists.newArrayList();
for (int i = 0; i < cols.length; i++) {
- arr.add(cols[i]);
+ emitColumns.add(cols[i]);
}
- setEmitColumns(arr);
- }
-
- @Override
- public void beginWindow(long l)
- {
- super.beginWindow(l);
- currentWindowId = l;
}
@Override
- protected void pollRecords(PreparedStatement ps)
+ public String getTuple(ResultSet rs)
{
- ResultSet rs = null;
- List<Object> metaList = new ArrayList<>();
-
- if (isReplayed) {
- return;
- }
-
+ StringBuilder resultTuple = new StringBuilder();
try {
- if (ps.isClosed()) {
- LOG.debug("Returning due to closed ps for non-pollable partitions");
- return;
+ for (String obj : emitColumns) {
+ resultTuple.append(rs.getObject(obj) + ",");
}
+ return resultTuple.substring(0, resultTuple.length() - 1); //remove last comma
} catch (SQLException e) {
- LOG.error("Prepared statement is closed", e);
throw new RuntimeException(e);
}
-
- try (PreparedStatement pStat = ps;) {
- pStat.setFetchSize(getFetchSize());
- LOG.debug("sql query = {}", pStat);
- rs = pStat.executeQuery();
- boolean hasNext = false;
-
- if (rs == null || rs.isClosed()) {
- return;
- }
-
- while ((hasNext = rs.next())) {
- Object key = null;
- StringBuilder resultTuple = new StringBuilder();
- try {
- if (count < getBatchSize()) {
- key = rs.getObject(getKey());
- for (String obj : emitColumns) {
- resultTuple.append(rs.getObject(obj) + ",");
- }
- metaList.add(resultTuple.substring(0, resultTuple.length() - 1));
- count++;
- } else {
- emitQueue.add(metaList);
- metaList = new ArrayList<>();
- key = rs.getObject(getKey());
- for (String obj : emitColumns) {
- resultTuple.append(rs.getObject(obj) + ",");
- }
- metaList.add(resultTuple.substring(0, resultTuple.length() - 1));
- count = 0;
- }
- } catch (NullPointerException npe) {
- LOG.error("Key not found" + npe);
- throw new RuntimeException(npe);
- }
- if (isPollable) {
- highestPolled = key.toString();
- isPolled = true;
- }
- }
- /*Flush the remaining records once the result set is over and batch-size is not reached,
- * Dont flush if its pollable*/
- if (!hasNext) {
- if ((isPollable && isPolled) || !isPollable) {
- emitQueue.offer(metaList);
- metaList = new ArrayList<>();
- count = 0;
- }
- if (!isPolled) {
- isPolled = true;
- }
- }
- LOG.debug("last creation time stamp = {}", lastCreationTsMillis);
- } catch (SQLException ex) {
- throw new RuntimeException(ex);
- }
}
@Override
- public void emitTuples()
+ protected void emitTuple(String tuple)
{
- if (isReplayed) {
- LOG.debug(
- "Calling emit tuples during window - " + currentWindowId + "::" + windowManager.getLargestRecoveryWindow());
- LOG.debug("Returning for replayed window");
- return;
- }
-
- List<Object> tuples;
-
- if (lastBatchWindowId < currentWindowId) {
- if ((tuples = emitQueue.poll()) != null) {
- for (Object tuple : tuples) {
- String[] str = tuple.toString().split(",");
- if (lower == null) {
- lower = str[0];
- }
- upper = str[0];
- outputPort.emit(tuple);
- }
- lastBatchWindowId = currentWindowId;
- }
- }
+ outputPort.emit(tuple);
}
-
- private static final Logger LOG = LoggerFactory.getLogger(JdbcPollInputOperator.class);
}