You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by bh...@apache.org on 2016/07/15 06:04:58 UTC
[1/2] apex-malhar git commit: APEXMALHAR-2066 JdbcPolling, idempotent,
partitionable
Repository: apex-malhar
Updated Branches:
refs/heads/master 132012823 -> aaa4464f0
APEXMALHAR-2066 JdbcPolling,idempotent,partitionable
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/4c7d268d
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/4c7d268d
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/4c7d268d
Branch: refs/heads/master
Commit: 4c7d268dbcdb6adbe84daadac6787c4d2f6b203d
Parents: 9b62506
Author: devtagare <de...@gmail.com>
Authored: Wed May 18 15:25:56 2016 -0700
Committer: devtagare <de...@gmail.com>
Committed: Thu Jul 14 11:42:36 2016 -0700
----------------------------------------------------------------------
.../db/jdbc/AbstractJdbcPollInputOperator.java | 661 +++++++++++++++++++
.../lib/db/jdbc/JdbcMetaDataUtility.java | 353 ++++++++++
.../lib/db/jdbc/JdbcPollInputOperator.java | 231 +++++++
.../datatorrent/lib/db/jdbc/JdbcPollerTest.java | 246 +++++++
4 files changed, 1491 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4c7d268d/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
new file mode 100644
index 0000000..ab12ed3
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
@@ -0,0 +1,661 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.validation.constraints.Min;
+
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.Operator.IdleTimeHandler;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.lib.db.AbstractStoreInputOperator;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * Abstract operator for for consuming data using JDBC interface<br>
+ * User needs User needs to provide
+ * tableName,dbConnection,setEmitColumnList,look-up key <br>
+ * Optionally batchSize,pollInterval,Look-up key and a where clause can be given
+ * <br>
+ * This operator uses static partitioning to arrive at range queries for exactly
+ * once reads<br>
+ * This operator will create a configured number of non-polling static
+ * partitions for fetching the existing data in the table. And an additional
+ * single partition for polling additive data. Assumption is that there is an
+ * ordered column using which range queries can be formed<br>
+ * If an emitColumnList is provided, please ensure that the keyColumn is the
+ * first column in the list<br>
+ * Range queries are formed using the {@link JdbcMetaDataUtility}} Output -
+ * comma separated list of the emit columns eg columnA,columnB,columnC<br>
+ * Only newly added data which has increasing ids will be fetched by the polling
+ * jdbc partition
+ *
+ * In the next iterations this operator would support an in-clause for
+ * idempotency instead of having only range query support to support non ordered
+ * key columns
+ *
+ *
+ * @displayName Jdbc Polling Input Operator
+ * @category Input
+ * @tags database, sql, jdbc, partitionable,exactlyOnce
+ */
+@Evolving
+public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInputOperator<T, JdbcStore>
+ implements ActivationListener<Context>, IdleTimeHandler, Partitioner<AbstractJdbcPollInputOperator<T>>
+{
+ /**
+ * poll interval in milliseconds
+ */
+ private static int pollInterval = 10000;
+
+ @Min(1)
+ private int partitionCount = 1;
+ protected transient int operatorId;
+ protected transient boolean isReplayed;
+ protected transient boolean isPollable;
+ protected int batchSize;
+ protected static int fetchSize = 20000;
+ /**
+ * Map of windowId to <lower bound,upper bound> of the range key
+ */
+ protected transient MutablePair<String, String> currentWindowRecoveryState;
+
+ /**
+ * size of the emit queue used to hold polled records before emit
+ */
+ private static int queueCapacity = 4 * 1024 * 1024;
+ private transient volatile boolean execute;
+ private transient AtomicReference<Throwable> cause;
+ protected transient int spinMillis;
+ private transient OperatorContext context;
+ protected String tableName;
+ protected String key;
+ protected long currentWindowId;
+ protected KeyValPair<String, String> rangeQueryPair;
+ protected String lower;
+ protected String upper;
+ protected boolean recovered;
+ protected boolean isPolled;
+ protected String whereCondition = null;
+ protected String previousUpperBound;
+ protected String highestPolled;
+ private static final String user = "";
+ private static final String password = "";
+ /**
+ * thread to poll database
+ */
+ private transient Thread dbPoller;
+ protected transient ArrayBlockingQueue<List<T>> emitQueue;
+ protected transient PreparedStatement ps;
+ protected WindowDataManager windowManager;
+ private String emitColumnList;
+
+ /**
+ * Returns the where clause
+ */
+ public String getWhereCondition()
+ {
+ return whereCondition;
+ }
+
+ /**
+ * Sets the where clause
+ */
+ public void setWhereCondition(String whereCondition)
+ {
+ this.whereCondition = whereCondition;
+ }
+
+ /**
+ * Returns the list of columns to select from the query
+ */
+ public String getEmitColumnList()
+ {
+ return emitColumnList;
+ }
+
+ /**
+ * Comma separated list of columns to select from the given table
+ */
+ public void setEmitColumnList(String emitColumnList)
+ {
+ this.emitColumnList = emitColumnList;
+ }
+
+ /**
+ * Returns the fetchsize for getting the results
+ */
+ public int getFetchSize()
+ {
+ return fetchSize;
+ }
+
+ /**
+ * Sets the fetchsize for getting the results
+ */
+ public void setFetchSize(int fetchSize)
+ {
+ this.fetchSize = fetchSize;
+ }
+
+ protected abstract void pollRecords(PreparedStatement ps);
+
+ /**
+ * Returns the interval for polling the queue
+ */
+ public int getPollInterval()
+ {
+ return pollInterval;
+ }
+
+ /**
+ * Sets the interval for polling the emit queue
+ */
+ public void setPollInterval(int pollInterval)
+ {
+ this.pollInterval = pollInterval;
+ }
+
+ /**
+ * Returns the capacity of the emit queue
+ */
+ public int getQueueCapacity()
+ {
+ return queueCapacity;
+ }
+
+ /**
+ * Sets the capacity of the emit queue
+ */
+ public void setQueueCapacity(int queueCapacity)
+ {
+ this.queueCapacity = queueCapacity;
+ }
+
+ /**
+ * Returns the ordered key used to generate the range queries
+ */
+ public String getKey()
+ {
+ return key;
+ }
+
+ /**
+ * Sets the ordered key used to generate the range queries
+ */
+ public void setKey(String key)
+ {
+ this.key = key;
+ }
+
+ /**
+ * Returns the tableName which would be queried
+ */
+ public String getTableName()
+ {
+ return tableName;
+ }
+
+ /**
+ * Sets the tableName to query
+ */
+ public void setTableName(String tableName)
+ {
+ this.tableName = tableName;
+ }
+
+ /**
+ * Returns rangeQueryPair - <lowerBound,upperBound>
+ */
+ public KeyValPair<String, String> getRangeQueryPair()
+ {
+ return rangeQueryPair;
+ }
+
+ /**
+ * Sets the rangeQueryPair <lowerBound,upperBound>
+ */
+ public void setRangeQueryPair(KeyValPair<String, String> rangeQueryPair)
+ {
+ this.rangeQueryPair = rangeQueryPair;
+ }
+
+ /**
+ * Returns batchSize indicating the number of elements in emitQueue
+ */
+ public int getBatchSize()
+ {
+ return batchSize;
+ }
+
+ /**
+ * Sets batchSize for number of elements in the emitQueue
+ */
+ public void setBatchSize(int batchSize)
+ {
+ this.batchSize = batchSize;
+ }
+
+ public AbstractJdbcPollInputOperator()
+ {
+ currentWindowRecoveryState = new MutablePair<>();
+ windowManager = new FSWindowDataManager();
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ super.setup(context);
+ spinMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
+ execute = true;
+ cause = new AtomicReference<Throwable>();
+ emitQueue = new ArrayBlockingQueue<List<T>>(queueCapacity);
+ this.context = context;
+ operatorId = context.getId();
+
+ try {
+
+ //If its a range query pass upper and lower bounds
+ //If its a polling query pass only the lower bound
+ if (getRangeQueryPair().getValue() != null) {
+ ps = store.getConnection()
+ .prepareStatement(
+ JdbcMetaDataUtility.buildRangeQuery(getTableName(), getKey(), rangeQueryPair.getKey(),
+ rangeQueryPair.getValue()),
+ java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
+ } else {
+ ps = store.getConnection().prepareStatement(
+ JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), rangeQueryPair.getKey()),
+ java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
+ isPollable = true;
+ }
+
+ } catch (SQLException e) {
+ LOG.error("Exception in initializing the range query for a given partition", e);
+ throw new RuntimeException(e);
+ }
+
+ windowManager.setup(context);
+ LOG.debug("super setup done...");
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ currentWindowId = windowId;
+
+ isReplayed = false;
+
+ if (currentWindowId <= windowManager.getLargestRecoveryWindow()) {
+ try {
+ replay(currentWindowId);
+ } catch (SQLException e) {
+ LOG.error("Exception in replayed windows", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ if (isReplayed && currentWindowId == windowManager.getLargestRecoveryWindow()) {
+ try {
+ if (!isPollable && rangeQueryPair.getValue() != null) {
+
+ ps = store.getConnection().prepareStatement(
+ JdbcMetaDataUtility.buildGTRangeQuery(getTableName(), getKey(), previousUpperBound,
+ rangeQueryPair.getValue()),
+ java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
+ } else {
+ String bound = null;
+ if (previousUpperBound == null) {
+ bound = getRangeQueryPair().getKey();
+ } else {
+ bound = previousUpperBound;
+ }
+ ps = store.getConnection().prepareStatement(
+ JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), bound),
+ java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
+ isPollable = true;
+ }
+ isReplayed = false;
+ LOG.debug("Prepared statement after re-initialization - {} ", ps.toString());
+ } catch (SQLException e) {
+ // TODO Auto-generated catch block
+ throw new RuntimeException(e);
+ }
+ }
+
+ //Reset the pollable query with the updated upper and lower bounds
+ if (isPollable) {
+ try {
+ String bound = null;
+ if (previousUpperBound == null && highestPolled == null) {
+ bound = getRangeQueryPair().getKey();
+ } else {
+ bound = highestPolled;
+ }
+ ps = store.getConnection().prepareStatement(
+ JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), bound),
+ java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
+ LOG.debug("Polling query {} {}", ps.toString(), currentWindowId);
+ isPolled = false;
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ lower = null;
+ upper = null;
+
+ //Check if a thread is already active and start only if its no
+ //Do not start the thread from setup, will conflict with the replay
+ if (dbPoller == null && !isReplayed) {
+ //If this is not a replayed state, reset the ps to highest read offset + 1,
+ //keep the upper bound as the one that was initialized after static partitioning
+ LOG.info("Statement when re-initialized {}", ps.toString());
+ dbPoller = new Thread(new DBPoller());
+ dbPoller.start();
+ }
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ if (isReplayed) {
+ return;
+ }
+
+ List<T> tuples;
+
+ if ((tuples = emitQueue.poll()) != null) {
+ for (Object tuple : tuples) {
+ if (lower == null) {
+ lower = tuple.toString();
+ }
+ upper = tuple.toString();
+ outputPort.emit((T)tuple);
+ }
+ }
+ }
+
+ @Override
+ public void endWindow()
+ {
+ try {
+ if (currentWindowId > windowManager.getLargestRecoveryWindow()) {
+ if (currentWindowRecoveryState == null) {
+ currentWindowRecoveryState = new MutablePair<String, String>();
+ }
+ if (lower != null && upper != null) {
+ previousUpperBound = upper;
+ isPolled = true;
+ }
+ MutablePair<String, String> windowBoundaryPair = new MutablePair<>(lower, upper);
+ currentWindowRecoveryState = windowBoundaryPair;
+ windowManager.save(currentWindowRecoveryState, operatorId, currentWindowId);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("saving recovery", e);
+ }
+ currentWindowRecoveryState = new MutablePair<>();
+ }
+
+ public int getPartitionCount()
+ {
+ return partitionCount;
+ }
+
+ public void setPartitionCount(int partitionCount)
+ {
+ this.partitionCount = partitionCount;
+ }
+
+ @Override
+ public void activate(Context cntxt)
+ {
+ if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID
+ && context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowManager.getLargestRecoveryWindow()) {
+ // If it is a replay state, don't start any threads here
+ return;
+ }
+ }
+
+ @Override
+ public void deactivate()
+ {
+ try {
+ if (dbPoller != null && dbPoller.isAlive()) {
+ dbPoller.interrupt();
+ dbPoller.join();
+ }
+ } catch (InterruptedException ex) {
+ // log and ignore, ending execution anyway
+ LOG.error("exception in poller thread: ", ex);
+ }
+ }
+
+ @Override
+ public void handleIdleTime()
+ {
+ if (execute) {
+ try {
+ Thread.sleep(spinMillis);
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ }
+ } else {
+ LOG.error("Exception: ", cause);
+ DTThrowable.rethrow(cause.get());
+ }
+ }
+
+ protected void replay(long windowId) throws SQLException
+ {
+ isReplayed = true;
+
+ MutablePair<String, String> recoveredData = new MutablePair<String, String>();
+ try {
+ recoveredData = (MutablePair<String, String>)windowManager.load(operatorId, windowId);
+
+ if (recoveredData != null) {
+ //skip the window and return if there was no incoming data in the window
+ if (recoveredData.left == null || recoveredData.right == null) {
+ return;
+ }
+
+ if (recoveredData.right.equals(rangeQueryPair.getValue()) || recoveredData.right.equals(previousUpperBound)) {
+ LOG.info("Matched so returning");
+ return;
+ }
+
+ JdbcPollInputOperator jdbcPoller = new JdbcPollInputOperator();
+ jdbcPoller.setStore(store);
+ jdbcPoller.setKey(getKey());
+ jdbcPoller.setPartitionCount(getPartitionCount());
+ jdbcPoller.setPollInterval(getPollInterval());
+ jdbcPoller.setTableName(getTableName());
+ jdbcPoller.setBatchSize(getBatchSize());
+ isPollable = false;
+
+ LOG.debug("[Window ID -" + windowId + "," + recoveredData.left + "," + recoveredData.right + "]");
+
+ jdbcPoller.setRangeQueryPair(new KeyValPair<String, String>(recoveredData.left, recoveredData.right));
+
+ jdbcPoller.ps = jdbcPoller.store.getConnection().prepareStatement(
+ JdbcMetaDataUtility.buildRangeQuery(jdbcPoller.getTableName(), jdbcPoller.getKey(),
+ jdbcPoller.getRangeQueryPair().getKey(), jdbcPoller.getRangeQueryPair().getValue()),
+ java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
+ LOG.info("Query formed for recovered data - {}", jdbcPoller.ps.toString());
+
+ emitReplayedTuples(jdbcPoller.ps);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("replay", e);
+ }
+
+ }
+
+ /**
+ * Replays the tuples in sync mode for replayed windows
+ */
+ public void emitReplayedTuples(PreparedStatement ps)
+ {
+ LOG.debug("Emitting replayed statement is -" + ps.toString());
+ ResultSet rs = null;
+ try (PreparedStatement pStat = ps;) {
+ pStat.setFetchSize(getFetchSize());
+ LOG.debug("sql query = {}", pStat);
+ rs = pStat.executeQuery();
+ if (rs == null || rs.isClosed()) {
+ LOG.debug("Nothing to replay");
+ return;
+ }
+ while (rs.next()) {
+ previousUpperBound = rs.getObject(getKey()).toString();
+ outputPort.emit((T)rs.getObject(getKey()));
+ }
+ } catch (SQLException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /**
+ * Uses a static partitioning scheme to initialize operator partitions with
+ * non-overlapping key ranges to read In addition to 'n' partitions, 'n+1'
+ * partition is a polling partition which reads the records beyond the given
+ * range
+ */
+ @Override
+ public Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>> definePartitions(
+ Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>> partitions,
+ com.datatorrent.api.Partitioner.PartitioningContext context)
+ {
+ List<Partition<AbstractJdbcPollInputOperator<T>>> newPartitions = new ArrayList<Partition<AbstractJdbcPollInputOperator<T>>>(
+ getPartitionCount());
+ JdbcStore jdbcStore = new JdbcStore();
+ jdbcStore.setDatabaseDriver(store.getDatabaseDriver());
+ jdbcStore.setDatabaseUrl(store.getDatabaseUrl());
+ jdbcStore.setConnectionProperties(store.getConnectionProperties());
+
+ jdbcStore.connect();
+
+ HashMap<Integer, KeyValPair<String, String>> partitionToRangeMap = null;
+ try {
+ partitionToRangeMap = JdbcMetaDataUtility.getPartitionedQueryMap(getPartitionCount(),
+ jdbcStore.getDatabaseDriver(), jdbcStore.getDatabaseUrl(), getTableName(), getKey(),
+ store.getConnectionProperties().getProperty(user), store.getConnectionProperties().getProperty(password),
+ whereCondition, emitColumnList);
+ } catch (SQLException e) {
+ LOG.error("Exception in initializing the partition range", e);
+ }
+
+ KryoCloneUtils<AbstractJdbcPollInputOperator<T>> cloneUtils = KryoCloneUtils.createCloneUtils(this);
+
+ for (int i = 0; i <= getPartitionCount(); i++) {
+ AbstractJdbcPollInputOperator<T> jdbcPoller = null;
+
+ jdbcPoller = cloneUtils.getClone();
+
+ jdbcPoller.setStore(store);
+ jdbcPoller.setKey(getKey());
+ jdbcPoller.setPartitionCount(getPartitionCount());
+ jdbcPoller.setPollInterval(getPollInterval());
+ jdbcPoller.setTableName(getTableName());
+ jdbcPoller.setBatchSize(getBatchSize());
+ jdbcPoller.setEmitColumnList(getEmitColumnList());
+
+ store.connect();
+ //The n given partitions are for range queries and n + 1 partition is for polling query
+ //The upper bound for the n+1 partition is set to null since its a pollable partition
+ if (i < getPartitionCount()) {
+ jdbcPoller.setRangeQueryPair(partitionToRangeMap.get(i));
+ isPollable = false;
+ } else {
+ jdbcPoller.setRangeQueryPair(new KeyValPair<String, String>(partitionToRangeMap.get(i - 1).getValue(), null));
+ isPollable = true;
+ }
+ Partition<AbstractJdbcPollInputOperator<T>> po = new DefaultPartition<AbstractJdbcPollInputOperator<T>>(
+ jdbcPoller);
+ newPartitions.add(po);
+ }
+
+ previousUpperBound = null;
+ return newPartitions;
+ }
+
+ @Override
+ public void partitioned(
+ Map<Integer, com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>> partitions)
+ {
+ //Nothing to implement here
+ }
+
+ /**
+ * This class polls a store that can be queried with a JDBC interface The
+ * preparedStatement is updated as more rows are read
+ */
+ public class DBPoller implements Runnable
+ {
+ @Override
+ public void run()
+ {
+ while (execute) {
+ try {
+ long startTs = System.currentTimeMillis();
+ if ((isPollable && !isPolled) || !isPollable) {
+ pollRecords(ps);
+ }
+ long endTs = System.currentTimeMillis();
+ long ioTime = endTs - startTs;
+ long sleepTime = pollInterval - ioTime;
+ LOG.debug("pollInterval = {} , I/O time = {} , sleepTime = {}", pollInterval, ioTime, sleepTime);
+ Thread.sleep(sleepTime > 0 ? sleepTime : 0);
+ } catch (Exception ex) {
+ cause.set(ex);
+ execute = false;
+ }
+ }
+ }
+ }
+
+ private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(AbstractJdbcPollInputOperator.class);
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4c7d268d/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java
new file mode 100644
index 0000000..9ba353c
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * A utility class used to retrieve the metadata for a given unique key of a SQL
+ * table. This class would emit range queries based on a primary index given
+ *
+ * LIMIT clause may not work with all databases or may not return the same
+ * results always<br>
+ *
+ * This utility has been tested with MySQL and where clause is supported<br>
+ *
+ * @Input - dbName,tableName, primaryKey
+ * @Output - map<operatorId,prepared statement>
+ *
+ */
+@Evolving
+public class JdbcMetaDataUtility
+{
+ private static String DB_DRIVER = "com.mysql.jdbc.Driver";
+ private static String DB_CONNECTION = "";
+ private static String DB_USER = "";
+ private static String DB_PASSWORD = "";
+ private static String TABLE_NAME = "";
+ private static String KEY_COLUMN = "";
+ private static String WHERE_CLAUSE = null;
+ private static String COLUMN_LIST = null;
+
+ private static Logger LOG = LoggerFactory.getLogger(JdbcMetaDataUtility.class);
+
+ public JdbcMetaDataUtility()
+ {
+
+ }
+
+ public JdbcMetaDataUtility(String dbConnection, String tableName, String key, String userName, String password)
+ {
+ DB_CONNECTION = dbConnection;
+ DB_USER = userName;
+ DB_PASSWORD = password;
+ TABLE_NAME = tableName;
+ KEY_COLUMN = key;
+ }
+
+ /**
+ * Returns the database connection handle
+ * */
+ private static Connection getDBConnection()
+ {
+
+ Connection dbConnection = null;
+
+ try {
+ Class.forName(DB_DRIVER);
+ } catch (ClassNotFoundException e) {
+ LOG.error("Driver not found", e);
+ throw new RuntimeException(e);
+ }
+
+ try {
+ dbConnection = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
+ return dbConnection;
+ } catch (SQLException e) {
+ LOG.error("Exception in getting connection handle", e);
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ private static String generateQueryString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("SELECT COUNT(*) as RowCount from " + TABLE_NAME);
+
+ if (WHERE_CLAUSE != null) {
+ sb.append(" WHERE " + WHERE_CLAUSE);
+ }
+
+ return sb.toString();
+ }
+
+ /**
+ * Finds the total number of rows in the table
+ */
+ private static long getRecordRange(String query) throws SQLException
+ {
+ long rowCount = 0;
+ Connection dbConnection = null;
+ PreparedStatement preparedStatement = null;
+
+ try {
+ dbConnection = getDBConnection();
+ preparedStatement = dbConnection.prepareStatement(query);
+
+ ResultSet rs = preparedStatement.executeQuery();
+
+ while (rs.next()) {
+ rowCount = Long.parseLong(rs.getString(1));
+ LOG.debug("# Rows - " + rowCount);
+ }
+
+ } catch (SQLException e) {
+ LOG.error("Exception in retreiving result set", e);
+ throw new RuntimeException(e);
+ } finally {
+ if (preparedStatement != null) {
+ preparedStatement.close();
+ }
+ if (dbConnection != null) {
+ dbConnection.close();
+ }
+ }
+ return rowCount;
+ }
+
+ /**
+ * Returns a pair of <upper,lower> bounds for each partition of the
+ * {@link JdbcPollInputOperator}}
+ */
+ private static KeyValPair<String, String> getQueryBounds(long lower, long upper) throws SQLException
+ {
+ Connection dbConnection = null;
+ PreparedStatement psLowerBound = null;
+ PreparedStatement psUpperBound = null;
+
+ StringBuilder lowerBound = new StringBuilder();
+ StringBuilder upperBound = new StringBuilder();
+
+ KeyValPair<String, String> boundedQUeryPair = null;
+
+ try {
+ dbConnection = getDBConnection();
+
+ /*
+ * Run this loop only for n-1 partitions.
+ * By default the last partition will have fewer records, since we are rounding off
+ * */
+
+ lowerBound.append("SELECT " + KEY_COLUMN + " FROM " + TABLE_NAME);
+ upperBound.append("SELECT " + KEY_COLUMN + " FROM " + TABLE_NAME);
+
+ if (WHERE_CLAUSE != null) {
+ lowerBound.append(" WHERE " + WHERE_CLAUSE);
+ upperBound.append(" WHERE " + WHERE_CLAUSE);
+ }
+
+ lowerBound.append(" LIMIT " + (0 + lower) + ",1");
+ upperBound.append(" LIMIT " + (upper - 1) + ",1");
+
+ psLowerBound = dbConnection.prepareStatement(lowerBound.toString());
+ psUpperBound = dbConnection.prepareStatement(upperBound.toString());
+
+ ResultSet rsLower = psLowerBound.executeQuery();
+ ResultSet rsUpper = psUpperBound.executeQuery();
+
+ String lowerVal = null;
+ String upperVal = null;
+
+ while (rsLower.next()) {
+ lowerVal = rsLower.getString(KEY_COLUMN);
+ }
+
+ while (rsUpper.next()) {
+ upperVal = rsUpper.getString(KEY_COLUMN);
+ }
+
+ boundedQUeryPair = new KeyValPair<String, String>(lowerVal, upperVal);
+
+ } catch (SQLException e) {
+ LOG.error("Exception in getting bounds for queries");
+ throw new RuntimeException(e);
+ } finally {
+ if (psLowerBound != null) {
+ psLowerBound.close();
+ }
+ if (psUpperBound != null) {
+ psUpperBound.close();
+ }
+ if (dbConnection != null) {
+ dbConnection.close();
+ }
+ }
+ return boundedQUeryPair;
+
+ }
+
+ /**
+ * Returns a map of partitionId to a KeyValPair of <lower,upper> of the query
+ * range<br>
+ * Ensures even distribution of records across partitions except the last
+ * partition By default the last partition for batch queries will have fewer
+ * records, since we are rounding off
+ *
+ * @throws SQLException
+ *
+ */
+ private static HashMap<Integer, KeyValPair<String, String>> getRangeQueries(int numberOfPartitions, int events,
+ long rowCount) throws SQLException
+ {
+ HashMap<Integer, KeyValPair<String, String>> partitionToQueryMap = new HashMap<Integer, KeyValPair<String, String>>();
+ for (int i = 0, lowerOffset = 0, upperOffset = events; i < numberOfPartitions
+ - 1; i++, lowerOffset += events, upperOffset += events) {
+
+ partitionToQueryMap.put(i, getQueryBounds(lowerOffset, upperOffset));
+ }
+
+ //Call to construct the lower and upper bounds for the last partition
+ partitionToQueryMap.put(numberOfPartitions - 1, getQueryBounds(events * (numberOfPartitions - 1), rowCount));
+
+ LOG.info("Partition map - " + partitionToQueryMap.toString());
+
+ return partitionToQueryMap;
+ }
+
+ /**
+ * Helper function returns a range query based on the bounds passed<br>
+ * Invoked from the setup method of {@link - AbstractJdbcPollInputOperator} to
+ * initialize the preparedStatement in the given operator<br>
+ * Optional whereClause for conditional selections Optional columnList for
+ * projection
+ */
+ public static String buildRangeQuery(String tableName, String keyColumn, String lower, String upper)
+ {
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("SELECT ");
+ if (COLUMN_LIST != null) {
+ sb.append(COLUMN_LIST);
+ } else {
+ sb.append("*");
+ }
+ sb.append(" from " + tableName + " WHERE ");
+ if (WHERE_CLAUSE != null) {
+ sb.append(WHERE_CLAUSE + " AND ");
+ }
+ sb.append(keyColumn + " BETWEEN '" + lower + "' AND '" + upper + "'");
+ return sb.toString();
+ }
+
+ /**
+ * Helper function that constructs a query from the next highest key after an
+ * operator is restarted
+ */
+ public static String buildGTRangeQuery(String tableName, String keyColumn, String lower, String upper)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("SELECT ");
+ if (COLUMN_LIST != null) {
+ sb.append(COLUMN_LIST);
+ } else {
+ sb.append("*");
+ }
+ sb.append(" from " + tableName + " WHERE ");
+ if (WHERE_CLAUSE != null) {
+ sb.append(WHERE_CLAUSE + " AND ");
+ }
+ sb.append(keyColumn + " > '" + lower + "' AND " + keyColumn + " <= '" + upper + "'");
+ return sb.toString();
+ }
+
+ /**
+ * Helper function that constructs a query for polling from outside the given
+ * range
+ */
+ public static String buildPollableQuery(String tableName, String keyColumn, String lower)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("SELECT ");
+ if (COLUMN_LIST != null) {
+ sb.append(COLUMN_LIST);
+ } else {
+ sb.append("*");
+ }
+ sb.append(" from " + tableName + " WHERE ");
+ if (WHERE_CLAUSE != null) {
+ sb.append(WHERE_CLAUSE + " AND ");
+ }
+ sb.append(keyColumn + " > '" + lower + "' ");
+ return sb.toString();
+ }
+
+ /**
+ * Called by the partitioner from {@link - AbstractJdbcPollInputOperator}<br>
+ * Finds the range of query per partition<br>
+ * Returns a map of partitionId to PreparedStatement based on the range
+ * computed
+ *
+ * @throws SQLException
+ */
+ public static HashMap<Integer, KeyValPair<String, String>> getPartitionedQueryMap(int partitions, String dbDriver,
+ String dbConnection, String tableName, String key, String userName, String password, String whereClause,
+ String columnList) throws SQLException
+ {
+ long rowCount = 0L;
+ try {
+ DB_CONNECTION = dbConnection;
+ DB_USER = userName;
+ DB_PASSWORD = password;
+ TABLE_NAME = tableName;
+ KEY_COLUMN = key;
+ WHERE_CLAUSE = whereClause;
+ COLUMN_LIST = columnList;
+ DB_DRIVER = dbDriver;
+ rowCount = getRecordRange(generateQueryString());
+ } catch (SQLException e) {
+ LOG.error("Exception in getting the record range", e);
+ }
+ return getRangeQueries(partitions, getOffset(rowCount, partitions), rowCount);
+ }
+
+ /**
+ * Returns the rounded offset to arrive at a range query
+ */
+ private static int getOffset(long rowCount, int partitions)
+ {
+ if (rowCount % partitions == 0) {
+ return (int)(rowCount / partitions);
+ } else {
+ return (int)((rowCount - (rowCount % (partitions - 1))) / (partitions - 1));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4c7d268d/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java
new file mode 100644
index 0000000..45f96bf
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+
+/**
+ * A concrete implementation for {@link AbstractJdbcPollInputOperator}} for
+ * consuming data from MySQL using JDBC interface <br>
+ * User needs to provide tableName,dbConnection,setEmitColumnList,look-up key
+ * <br>
+ * Optionally batchSize,pollInterval,Look-up key and a where clause can be given
+ * <br>
+ * This operator uses static partitioning to arrive at range queries for exactly
+ * once reads<br>
+ * Assumption is that there is an ordered column using which range queries can
+ * be formed<br>
+ * If an emitColumnList is provided, please ensure that the keyColumn is the
+ * first column in the list<br>
+ * Range queries are formed using the {@link JdbcMetaDataUtility}} Output -
+ * comma separated list of the emit columns eg columnA,columnB,columnC
+ *
+ * @displayName Jdbc Polling Input Operator
+ * @category Input
+ * @tags database, sql, jdbc
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public class JdbcPollInputOperator extends AbstractJdbcPollInputOperator<Object>
+{
+ private long lastBatchWindowId;
+ private transient long currentWindowId;
+ private long lastCreationTsMillis;
+ private long fetchBackMillis = 0L;
+ private ArrayList<String> emitColumns;
+ private transient int count = 0;
+
+ /**
+ * Returns the emit columns
+ */
+ public List<String> getEmitColumns()
+ {
+ return emitColumns;
+ }
+
+ /**
+ * Sets the emit columns
+ */
+ public void setEmitColumns(ArrayList<String> emitColumns)
+ {
+ this.emitColumns = emitColumns;
+ }
+
+ /**
+ * Returns fetchBackMilis
+ */
+ public long getFetchBackMillis()
+ {
+ return fetchBackMillis;
+ }
+
+ /**
+ * Sets fetchBackMilis - used in polling
+ */
+ public void setFetchBackMillis(long fetchBackMillis)
+ {
+ this.fetchBackMillis = fetchBackMillis;
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ super.setup(context);
+ parseEmitColumnList(getEmitColumnList());
+ lastCreationTsMillis = System.currentTimeMillis() - fetchBackMillis;
+ }
+
+ private void parseEmitColumnList(String columnList)
+ {
+ String[] cols = columnList.split(",");
+ ArrayList<String> arr = Lists.newArrayList();
+ for (int i = 0; i < cols.length; i++) {
+ arr.add(cols[i]);
+ }
+ setEmitColumns(arr);
+ }
+
+ @Override
+ public void beginWindow(long l)
+ {
+ super.beginWindow(l);
+ currentWindowId = l;
+ }
+
+ @Override
+ protected void pollRecords(PreparedStatement ps)
+ {
+ ResultSet rs = null;
+ List<Object> metaList = new ArrayList<>();
+
+ if (isReplayed) {
+ return;
+ }
+
+ try {
+ if (ps.isClosed()) {
+ LOG.debug("Returning due to closed ps for non-pollable partitions");
+ return;
+ }
+ } catch (SQLException e) {
+ LOG.error("Prepared statement is closed", e);
+ throw new RuntimeException(e);
+ }
+
+ try (PreparedStatement pStat = ps;) {
+ pStat.setFetchSize(getFetchSize());
+ LOG.debug("sql query = {}", pStat);
+ rs = pStat.executeQuery();
+ boolean hasNext = false;
+
+ if (rs == null || rs.isClosed()) {
+ return;
+ }
+
+ while ((hasNext = rs.next())) {
+ Object key = null;
+ StringBuilder resultTuple = new StringBuilder();
+ try {
+ if (count < getBatchSize()) {
+ key = rs.getObject(getKey());
+ for (String obj : emitColumns) {
+ resultTuple.append(rs.getObject(obj) + ",");
+ }
+ metaList.add(resultTuple.substring(0, resultTuple.length() - 1));
+ count++;
+ } else {
+ emitQueue.add(metaList);
+ metaList = new ArrayList<>();
+ key = rs.getObject(getKey());
+ for (String obj : emitColumns) {
+ resultTuple.append(rs.getObject(obj) + ",");
+ }
+ metaList.add(resultTuple.substring(0, resultTuple.length() - 1));
+ count = 0;
+ }
+ } catch (NullPointerException npe) {
+ LOG.error("Key not found" + npe);
+ throw new RuntimeException(npe);
+ }
+ if (isPollable) {
+ highestPolled = key.toString();
+ isPolled = true;
+ }
+ }
+ /*Flush the remaining records once the result set is over and batch-size is not reached,
+ * Dont flush if its pollable*/
+ if (!hasNext) {
+ if ((isPollable && isPolled) || !isPollable) {
+ emitQueue.offer(metaList);
+ metaList = new ArrayList<>();
+ count = 0;
+ }
+ if (!isPolled) {
+ isPolled = true;
+ }
+ }
+ LOG.debug("last creation time stamp = {}", lastCreationTsMillis);
+ } catch (SQLException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ if (isReplayed) {
+ LOG.debug(
+ "Calling emit tuples during window - " + currentWindowId + "::" + windowManager.getLargestRecoveryWindow());
+ LOG.debug("Returning for replayed window");
+ return;
+ }
+
+ List<Object> tuples;
+
+ if (lastBatchWindowId < currentWindowId) {
+ if ((tuples = emitQueue.poll()) != null) {
+ for (Object tuple : tuples) {
+ String[] str = tuple.toString().split(",");
+ if (lower == null) {
+ lower = str[0];
+ }
+ upper = str[0];
+ outputPort.emit(tuple);
+ }
+ lastBatchWindowId = currentWindowId;
+ }
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(JdbcPollInputOperator.class);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4c7d268d/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPollerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPollerTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPollerTest.java
new file mode 100644
index 0000000..573e45d
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPollerTest.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.commons.io.FileUtils;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+/**
+ * Tests for {@link AbstractJdbcPollInputOperator} and
+ * {@link JdbcPollInputOperator}
+ */
+public class JdbcPollerTest
+{
+ public static final String DB_DRIVER = "org.hsqldb.jdbcDriver";
+ public static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
+
+ private static final String TABLE_NAME = "test_account_table";
+ private static String APP_ID = "JdbcPollingOperatorTest";
+ public String dir = null;
+
+ @BeforeClass
+ public static void setup()
+ {
+ try {
+ cleanup();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ try {
+ Class.forName(DB_DRIVER).newInstance();
+
+ Connection con = DriverManager.getConnection(URL);
+ Statement stmt = con.createStatement();
+
+ String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
+ + " (Account_No INTEGER, Name VARCHAR(255), Amount INTEGER)";
+ stmt.executeUpdate(createTable);
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @AfterClass
+ public static void cleanup()
+ {
+ try {
+ FileUtils.deleteDirectory(new File("target/" + APP_ID));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void cleanTable()
+ {
+ try {
+ Connection con = DriverManager.getConnection(URL);
+ Statement stmt = con.createStatement();
+ String cleanTable = "delete from " + TABLE_NAME;
+ stmt.executeUpdate(cleanTable);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void insertEventsInTable(int numEvents, int offset)
+ {
+ try {
+ Connection con = DriverManager.getConnection(URL);
+ String insert = "insert into " + TABLE_NAME + " values (?,?,?)";
+ PreparedStatement stmt = con.prepareStatement(insert);
+ for (int i = 0; i < numEvents; i++, offset++) {
+ stmt.setInt(1, offset);
+ stmt.setString(2, "Account_Holder-" + offset);
+ stmt.setInt(3, (offset * 1000));
+ stmt.executeUpdate();
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Simulates actual application flow Adds a batch query partitiom, a pollable
+ * partition Incremental record polling is also checked
+ */
+ @Test
+ public void testJdbcPollingInputOperatorBatch() throws InterruptedException
+ {
+ cleanTable();
+ insertEventsInTable(10, 0);
+ JdbcStore store = new JdbcStore();
+ store.setDatabaseDriver(DB_DRIVER);
+ store.setDatabaseUrl(URL);
+
+ Attribute.AttributeMap.DefaultAttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+ this.dir = "target/" + APP_ID + "/";
+ attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+ attributeMap.put(Context.DAGContext.APPLICATION_PATH, dir);
+
+ JdbcPollInputOperator inputOperator = new JdbcPollInputOperator();
+ inputOperator.setStore(store);
+ inputOperator.setBatchSize(100);
+ inputOperator.setPollInterval(1000);
+ inputOperator.setEmitColumnList("Account_No,Name,Amount");
+ inputOperator.setKey("Account_No");
+ inputOperator.setTableName(TABLE_NAME);
+ inputOperator.setFetchSize(100);
+ inputOperator.setPartitionCount(1);
+
+ CollectorTestSink<Object> sink = new CollectorTestSink<>();
+ inputOperator.outputPort.setSink(sink);
+
+ TestUtils.MockBatchedOperatorStats readerStats = new TestUtils.MockBatchedOperatorStats(2);
+
+ DefaultPartition<AbstractJdbcPollInputOperator<Object>> apartition = new DefaultPartition<AbstractJdbcPollInputOperator<Object>>(
+ inputOperator);
+
+ TestUtils.MockPartition<AbstractJdbcPollInputOperator<Object>> pseudoParttion = new TestUtils.MockPartition<>(
+ apartition, readerStats);
+
+ List<Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>> newMocks = Lists.newArrayList();
+
+ newMocks.add(pseudoParttion);
+
+ Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>> newPartitions = inputOperator
+ .definePartitions(newMocks, null);
+
+ Iterator<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>> itr = newPartitions
+ .iterator();
+
+ int operatorId = 0;
+ for (com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>> partition : newPartitions) {
+
+ Attribute.AttributeMap.DefaultAttributeMap partitionAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+ this.dir = "target/" + APP_ID + "/";
+ partitionAttributeMap.put(DAG.APPLICATION_ID, APP_ID);
+ partitionAttributeMap.put(Context.DAGContext.APPLICATION_PATH, dir);
+
+ OperatorContextTestHelper.TestIdOperatorContext partitioningContext = new OperatorContextTestHelper.TestIdOperatorContext(
+ operatorId++, partitionAttributeMap);
+
+ partition.getPartitionedInstance().setup(partitioningContext);
+ partition.getPartitionedInstance().activate(partitioningContext);
+ }
+
+ //First partition is for range queries,last is for polling queries
+ AbstractJdbcPollInputOperator<Object> newInstance = itr.next().getPartitionedInstance();
+ CollectorTestSink<Object> sink1 = new CollectorTestSink<>();
+ newInstance.outputPort.setSink(sink1);
+ newInstance.beginWindow(1);
+ Thread.sleep(50);
+ newInstance.emitTuples();
+ newInstance.endWindow();
+
+ Assert.assertEquals("rows from db", 10, sink1.collectedTuples.size());
+ int i = 0;
+ for (Object tuple : sink1.collectedTuples) {
+ String[] pojoEvent = tuple.toString().split(",");
+ Assert.assertTrue("i=" + i, Integer.parseInt(pojoEvent[0]) == i ? true : false);
+ i++;
+ }
+ sink1.collectedTuples.clear();
+
+ insertEventsInTable(10, 10);
+
+ AbstractJdbcPollInputOperator<Object> pollableInstance = itr.next().getPartitionedInstance();
+
+ pollableInstance.outputPort.setSink(sink1);
+
+ pollableInstance.beginWindow(1);
+ Thread.sleep(pollableInstance.getPollInterval());
+ pollableInstance.emitTuples();
+ pollableInstance.endWindow();
+
+
+ Assert.assertEquals("rows from db", 10, sink1.collectedTuples.size());
+ i = 10;
+ for (Object tuple : sink1.collectedTuples) {
+ String[] pojoEvent = tuple.toString().split(",");
+ Assert.assertTrue("i=" + i, Integer.parseInt(pojoEvent[0]) == i ? true : false);
+ i++;
+ }
+
+ sink1.collectedTuples.clear();
+ insertEventsInTable(10, 20);
+
+ pollableInstance.beginWindow(2);
+ Thread.sleep(pollableInstance.getPollInterval());
+ pollableInstance.emitTuples();
+ pollableInstance.endWindow();
+
+ Assert.assertEquals("rows from db", 10, sink1.collectedTuples.size());
+
+ i = 20;
+ for (Object tuple : sink1.collectedTuples) {
+ String[] pojoEvent = tuple.toString().split(",");
+ Assert.assertTrue("i=" + i, Integer.parseInt(pojoEvent[0]) == i ? true : false);
+ i++;
+ }
+ sink1.collectedTuples.clear();
+ }
+
+}
[2/2] apex-malhar git commit: Merge branch
'APEXMALHAR-2066-JdbcPolling' of
https://github.com/devtagare/incubator-apex-malhar
Posted by bh...@apache.org.
Merge branch 'APEXMALHAR-2066-JdbcPolling' of https://github.com/devtagare/incubator-apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/aaa4464f
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/aaa4464f
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/aaa4464f
Branch: refs/heads/master
Commit: aaa4464f0bb03db4aca5f6efd9658c0a791d0b42
Parents: 1320128 4c7d268
Author: bhupesh <bh...@gmail.com>
Authored: Fri Jul 15 09:55:47 2016 +0530
Committer: bhupesh <bh...@gmail.com>
Committed: Fri Jul 15 09:55:47 2016 +0530
----------------------------------------------------------------------
.../db/jdbc/AbstractJdbcPollInputOperator.java | 661 +++++++++++++++++++
.../lib/db/jdbc/JdbcMetaDataUtility.java | 353 ++++++++++
.../lib/db/jdbc/JdbcPollInputOperator.java | 231 +++++++
.../datatorrent/lib/db/jdbc/JdbcPollerTest.java | 246 +++++++
4 files changed, 1491 insertions(+)
----------------------------------------------------------------------