You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ti...@apache.org on 2015/09/10 22:19:21 UTC
[1/2] incubator-apex-malhar git commit: MLHR-1836 #comment Schema
integration with JDBC operators
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/devel-3 59f21fb5d -> b50165270
MLHR-1836 #comment Schema integration with JDBC operators
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/37438dea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/37438dea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/37438dea
Branch: refs/heads/devel-3
Commit: 37438dea31e45b4cf0b7c17986334efb9220891f
Parents: d910102
Author: Chandni Singh <ch...@datatorrent.com>
Authored: Fri Jul 31 17:43:45 2015 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Wed Sep 9 18:55:23 2015 -0700
----------------------------------------------------------------------
.../lib/db/jdbc/JdbcPOJOInputOperator.java | 620 +++++++++++++++++++
.../lib/db/jdbc/JdbcPOJOOutputOperator.java | 315 ++++++----
.../com/datatorrent/lib/db/jdbc/JdbcStore.java | 21 +-
.../lib/db/jdbc/JdbcOperatorTest.java | 164 +++--
.../datatorrent/lib/helper/TestPortContext.java | 59 ++
pom.xml | 3 +
6 files changed, 1000 insertions(+), 182 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/37438dea/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
new file mode 100644
index 0000000..405d8f9
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
@@ -0,0 +1,620 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.math.BigDecimal;
+import java.sql.*;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * A concrete input operator that reads from a database through the JDBC API.<br/>
+ * This operator by default uses the {@link #fieldInfos} and the table name to construct the sql query.<br/>
+ *
+ * A user can provide there own query by setting the {@link #query} property which takes precedence.<br/>
+ *
+ * For eg. user can set the query property to a complex one : "select x1, x2 from t1, t2 where t1.x3 = t2.x3 ;"<br/>
+ *
+ * This implementation is generic so it uses offset/limit mechanism for batching which is not optimal. Batching is most efficient
+ * when the tables/views are indexed and the query uses this information to retrieve data.<br/>
+ * This can be achieved in sub-classes by overriding {@link #queryToRetrieveData()} and {@link #setRuntimeParams()}.
+ *
+ * @displayName Jdbc Input Operator
+ * @category Input
+ * @tags database, sql, pojo, jdbc
+ * @since 2.1.0
+ */
+public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object> implements Operator.ActivationListener<Context.OperatorContext>
+{
+ private static int DEF_FETCH_SIZE = 100;
+
+ //table and query definition
+ private String tableName;
+ private String whereCondition;
+ private String groupByClause;
+ private String havingCondition;
+ private String orderByExpr;
+ private String query;
+ private boolean mysqlSyntax;
+
+ @NotNull
+ private List<FieldInfo> fieldInfos;
+
+ @Min(1)
+ private int fetchSize;
+ private int fetchDirection;
+
+ private final transient List<ActiveFieldInfo> columnFieldSetters;
+ private transient boolean windowDone;
+
+ protected String columnsExpression;
+ protected List<Integer> columnDataTypes;
+
+ private transient PreparedStatement preparedStatement;
+ protected transient Class<?> pojoClass;
+
+ protected int pageNumber;
+ private transient long sleepMillis;
+
+ @OutputPortFieldAnnotation(schemaRequired = true)
+ public final transient DefaultOutputPort<Object> outputPort = new DefaultOutputPort<Object>()
+ {
+ @Override
+ public void setup(Context.PortContext context)
+ {
+ pojoClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
+ };
+
+ public JdbcPOJOInputOperator()
+ {
+ super();
+ fetchSize = DEF_FETCH_SIZE;
+ columnFieldSetters = Lists.newArrayList();
+ mysqlSyntax = true;
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ Preconditions.checkArgument(query != null || tableName != null, "both query and table name are not set");
+ sleepMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
+ super.setup(context);
+
+ try {
+ //closing the query statement in super class as it is not needed
+ queryStatement.close();
+ if (query == null && columnsExpression == null) {
+ StringBuilder columns = new StringBuilder();
+ for (int i = 0; i < fieldInfos.size(); i++) {
+ columns.append(fieldInfos.get(i).getColumnName());
+ if (i < fieldInfos.size() - 1) {
+ columns.append(",");
+ }
+ }
+ columnsExpression = columns.toString();
+ LOG.debug("select expr {}", columnsExpression);
+ }
+
+ preparedStatement = store.connection.prepareStatement(queryToRetrieveData());
+ if (columnDataTypes == null) {
+ populateColumnDataTypes();
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+
+ for (FieldInfo fi : fieldInfos) {
+ columnFieldSetters.add(new ActiveFieldInfo(fi));
+ }
+ }
+
+ protected void populateColumnDataTypes() throws SQLException
+ {
+ columnDataTypes = Lists.newArrayList();
+ preparedStatement.setMaxRows(0);
+ setRuntimeParams();
+ try (ResultSet rs = preparedStatement.executeQuery()) {
+ Map<String, Integer> nameToType = Maps.newHashMap();
+ ResultSetMetaData rsMetaData = rs.getMetaData();
+ LOG.debug("resultSet MetaData column count {}", rsMetaData.getColumnCount());
+
+ for (int i = 1; i <= rsMetaData.getColumnCount(); i++) {
+ int type = rsMetaData.getColumnType(i);
+ String name = rsMetaData.getColumnName(i);
+ LOG.debug("column name {} type {}", name, type);
+ if (query == null) {
+ columnDataTypes.add(type);
+ } else {
+ //when it is a custom query we need to ensure the types are in the same order as field infos
+ nameToType.put(name, type);
+ }
+ }
+
+ if (query != null) {
+ for (FieldInfo fieldInfo : fieldInfos) {
+ columnDataTypes.add(nameToType.get(fieldInfo.getColumnName()));
+ }
+ }
+ }
+ preparedStatement.setFetchSize(fetchSize);
+ preparedStatement.setMaxRows(fetchSize);
+ }
+
+ @Override
+ public void beginWindow(long l)
+ {
+ windowDone = false;
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ if (!windowDone) {
+ try {
+ setRuntimeParams();
+ ResultSet resultSet = preparedStatement.executeQuery();
+ if (resultSet.next()) {
+ do {
+ Object tuple = getTuple(resultSet);
+ outputPort.emit(tuple);
+ }
+ while (resultSet.next());
+ } else {
+ windowDone = true;
+ }
+ resultSet.close();
+ pageNumber++;
+ } catch (SQLException ex) {
+ store.disconnect();
+ throw new RuntimeException(ex);
+ }
+ } else {
+ try {
+ Thread.sleep(sleepMillis);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ protected void setRuntimeParams() throws SQLException
+ {
+ if (mysqlSyntax) {
+ preparedStatement.setLong(1, pageNumber * fetchSize);
+ } else {
+ preparedStatement.setLong(1, pageNumber * fetchSize);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Object getTuple(ResultSet result)
+ {
+ Object obj;
+ try {
+ obj = pojoClass.newInstance();
+ } catch (InstantiationException | IllegalAccessException ex) {
+ store.disconnect();
+ throw new RuntimeException(ex);
+ }
+
+ try {
+ for (int i = 0; i < fieldInfos.size(); i++) {
+ int type = columnDataTypes.get(i);
+ ActiveFieldInfo afi = columnFieldSetters.get(i);
+
+ switch (type) {
+ case Types.CHAR:
+ case Types.VARCHAR:
+ String strVal = result.getString(i + 1);
+ ((PojoUtils.Setter<Object, String>)afi.setterOrGetter).set(obj, strVal);
+ break;
+
+ case Types.BOOLEAN:
+ boolean boolVal = result.getBoolean(i + 1);
+ ((PojoUtils.SetterBoolean<Object>)afi.setterOrGetter).set(obj, boolVal);
+ break;
+
+ case Types.TINYINT:
+ byte byteVal = result.getByte(i + 1);
+ ((PojoUtils.SetterByte<Object>)afi.setterOrGetter).set(obj, byteVal);
+ break;
+
+ case Types.SMALLINT:
+ short shortVal = result.getShort(i + 1);
+ ((PojoUtils.SetterShort<Object>)afi.setterOrGetter).set(obj, shortVal);
+ break;
+
+ case Types.INTEGER:
+ int intVal = result.getInt(i + 1);
+ ((PojoUtils.SetterInt<Object>)afi.setterOrGetter).set(obj, intVal);
+ break;
+
+ case Types.BIGINT:
+ long longVal = result.getLong(i + 1);
+ ((PojoUtils.SetterLong<Object>)afi.setterOrGetter).set(obj, longVal);
+ break;
+
+ case Types.FLOAT:
+ float floatVal = result.getFloat(i + 1);
+ ((PojoUtils.SetterFloat<Object>)afi.setterOrGetter).set(obj, floatVal);
+ break;
+
+ case Types.DOUBLE:
+ double doubleVal = result.getDouble(i + 1);
+ ((PojoUtils.SetterDouble<Object>)afi.setterOrGetter).set(obj, doubleVal);
+ break;
+
+ case Types.DECIMAL:
+ BigDecimal bdVal = result.getBigDecimal(i + 1);
+ ((PojoUtils.Setter<Object, BigDecimal>)afi.setterOrGetter).set(obj, bdVal);
+ break;
+
+ case Types.TIMESTAMP:
+ Timestamp tsVal = result.getTimestamp(i + 1);
+ ((PojoUtils.SetterLong<Object>)afi.setterOrGetter).set(obj, tsVal.getTime());
+ break;
+
+ case Types.TIME:
+ Time timeVal = result.getTime(i + 1);
+ ((PojoUtils.SetterLong<Object>)afi.setterOrGetter).set(obj, timeVal.getTime());
+ break;
+
+ case Types.DATE:
+ Date dateVal = result.getDate(i + 1);
+ ((PojoUtils.SetterLong<Object>)afi.setterOrGetter).set(obj, dateVal.getTime());
+ break;
+
+ default:
+ handleUnknownDataType(type, obj, afi);
+ break;
+ }
+ }
+ return obj;
+ } catch (SQLException e) {
+ store.disconnect();
+ throw new RuntimeException("fetching metadata", e);
+ }
+ }
+
+ @SuppressWarnings("UnusedParameters")
+ protected void handleUnknownDataType(int type, Object tuple, ActiveFieldInfo activeFieldInfo)
+ {
+ throw new RuntimeException("unsupported data type " + type);
+ }
+
+ @Override
+ public String queryToRetrieveData()
+ {
+ StringBuilder builder = new StringBuilder();
+
+ if (query != null) {
+ builder.append(query.trim());
+ if (builder.charAt(builder.length() - 1) == ';') {
+ builder.deleteCharAt(builder.length() - 1);
+ }
+ } else {
+ builder.append("SELECT ").append(columnsExpression).append(" FROM ").append(tableName);
+ if (whereCondition != null) {
+ builder.append(" WHERE ").append(whereCondition);
+ }
+ if (groupByClause != null) {
+ builder.append(" GROUP BY ").append(groupByClause);
+ if (havingCondition != null) {
+ builder.append(" HAVING ").append(havingCondition);
+ }
+ }
+ if (orderByExpr != null) {
+ builder.append(" ORDER BY ").append(orderByExpr);
+ }
+ }
+ if (mysqlSyntax) {
+ builder.append(" LIMIT ").append(fetchSize).append(" OFFSET ?");
+ } else {
+ builder.append(" OFFSET ? ROWS FETCH NEXT ").append(fetchSize).append(" ROWS ONLY");
+ }
+ builder.append(";");
+ String queryStr = builder.toString();
+ LOG.debug("built query {}", queryStr);
+ return queryStr;
+ }
+
+ @Override
+ public void activate(Context.OperatorContext context)
+ {
+ for (int i = 0; i < columnDataTypes.size(); i++) {
+ final int type = columnDataTypes.get(i);
+ JdbcPOJOInputOperator.ActiveFieldInfo activeFieldInfo = columnFieldSetters.get(i);
+ switch (type) {
+ case (Types.CHAR):
+ case (Types.VARCHAR):
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression(),
+ String.class);
+ break;
+
+ case (Types.BOOLEAN):
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterBoolean(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ case (Types.TINYINT):
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterByte(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ case (Types.SMALLINT):
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterShort(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ case (Types.INTEGER):
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterInt(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ case (Types.BIGINT):
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ case (Types.FLOAT):
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterFloat(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ case (Types.DOUBLE):
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterDouble(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ case Types.DECIMAL:
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression(),
+ BigDecimal.class);
+ break;
+
+ case Types.TIMESTAMP:
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ case Types.TIME:
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ case Types.DATE:
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ default:
+ handleUnknownDataType(type, null, activeFieldInfo);
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void deactivate()
+ {
+ }
+
+ protected static class ActiveFieldInfo
+ {
+ final FieldInfo fieldInfo;
+ Object setterOrGetter;
+
+ ActiveFieldInfo(FieldInfo fieldInfo)
+ {
+ this.fieldInfo = fieldInfo;
+ }
+ }
+
+ /**
+ * A list of {@link FieldInfo}s where each item maps a column name to a pojo field name.
+ */
+ public List<FieldInfo> getFieldInfos()
+ {
+ return fieldInfos;
+ }
+
+ /**
+ * Sets the {@link FieldInfo}s. A {@link FieldInfo} maps a store column to a pojo field name.<br/>
+ * The value from fieldInfo.column is assigned to fieldInfo.pojoFieldExpression.
+ *
+ * @description $[].columnName name of the database column name
+ * @description $[].pojoFieldExpression pojo field name or expression
+ * @useSchema $[].pojoFieldExpression outputPort.fields[].name
+ */
+ public void setFieldInfos(List<FieldInfo> fieldInfos)
+ {
+ this.fieldInfos = fieldInfos;
+ }
+
+ /**
+ * @return table name
+ */
+ public String getTableName()
+ {
+ return tableName;
+ }
+
+ /**
+ * Sets the table name.
+ *
+ * @param tableName table name
+ */
+ public void setTableName(String tableName)
+ {
+ this.tableName = tableName;
+ }
+
+ /**
+ * @return where condition
+ */
+ public String getWhereCondition()
+ {
+ return whereCondition;
+ }
+
+ /**
+ * Sets the where condition.
+ *
+ * @param whereCondition where condition.
+ */
+ public void setWhereCondition(String whereCondition)
+ {
+ this.whereCondition = whereCondition;
+ }
+
+ /**
+ * @return group-by clause
+ */
+ public String getGroupByClause()
+ {
+ return groupByClause;
+ }
+
+ /**
+ * Sets the group by clause.
+ *
+ * @param groupByClause group-by clause.
+ */
+ public void setGroupByClause(String groupByClause)
+ {
+ this.groupByClause = groupByClause;
+ }
+
+ /**
+ * @return having condition
+ */
+ public String getHavingCondition()
+ {
+ return havingCondition;
+ }
+
+ /**
+ * Sets the having condition.
+ *
+ * @param havingCondition having condition
+ */
+ public void setHavingCondition(String havingCondition)
+ {
+ this.havingCondition = havingCondition;
+ }
+
+ /**
+ * @return order by expression.
+ */
+ public String getOrderByExpr()
+ {
+ return orderByExpr;
+ }
+
+ /**
+ * Sets the order by expression.
+ *
+ * @param orderByExpr order by expression.
+ */
+ public void setOrderByExpr(String orderByExpr)
+ {
+ this.orderByExpr = orderByExpr;
+ }
+
+ /**
+ * @return query
+ */
+ public String getQuery()
+ {
+ return query;
+ }
+
+ /**
+ * Sets the query
+ *
+ * @param query query
+ */
+ public void setQuery(String query)
+ {
+ this.query = query;
+ }
+
+ /**
+ * @return fetch size which is the number of rows retrieved from the database in a window.
+ */
+ public int getFetchSize()
+ {
+ return fetchSize;
+ }
+
+ /**
+ * Sets the fetch size which is the number of rows retrieved from the database in a window.
+ *
+ * @param fetchSize number of rows retrieved from db in a window.
+ */
+ public void setFetchSize(int fetchSize)
+ {
+ this.fetchSize = fetchSize;
+ }
+
+ /**
+ * @return fetch direction
+ */
+ public int getFetchDirection()
+ {
+ return fetchDirection;
+ }
+
+ /**
+ * This sets the direction used in processing a result set. It allows the JDBC driver to optimize its processing.
+ * Refer {@link PreparedStatement#setFetchDirection(int)}
+ *
+ * @param fetchDirection fetch direction
+ */
+ public void setFetchDirection(int fetchDirection)
+ {
+ this.fetchDirection = fetchDirection;
+ }
+
+ /**
+ * @return is syntax mysql
+ */
+ public boolean isMysqlSyntax()
+ {
+ return mysqlSyntax;
+ }
+
+ /**
+ * Sets the syntax of the query.
+ *
+ * @param mysqlSyntax true if it is mySql; when false oracle syntax is used.
+ */
+ public void setMysqlSyntax(boolean mysqlSyntax)
+ {
+ this.mysqlSyntax = mysqlSyntax;
+ }
+
+ public static final Logger LOG = LoggerFactory.getLogger(JdbcPOJOInputOperator.class);
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/37438dea/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java
index 7451a50..35fcf0f 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java
@@ -15,19 +15,25 @@
*/
package com.datatorrent.lib.db.jdbc;
+import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+
+import com.datatorrent.lib.util.FieldInfo;
import com.datatorrent.lib.util.PojoUtils;
import com.datatorrent.lib.util.PojoUtils.Getter;
import com.datatorrent.lib.util.PojoUtils.GetterBoolean;
-import com.datatorrent.lib.util.PojoUtils.GetterChar;
import com.datatorrent.lib.util.PojoUtils.GetterDouble;
import com.datatorrent.lib.util.PojoUtils.GetterFloat;
import com.datatorrent.lib.util.PojoUtils.GetterInt;
import com.datatorrent.lib.util.PojoUtils.GetterLong;
import com.datatorrent.lib.util.PojoUtils.GetterShort;
+import java.math.BigDecimal;
import java.sql.*;
-import java.util.ArrayList;
+import java.util.List;
import javax.validation.constraints.NotNull;
@@ -35,6 +41,8 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Lists;
+
/**
* <p>
* JdbcPOJOOutputOperator class.</p>
@@ -46,71 +54,48 @@ import org.slf4j.LoggerFactory;
* @since 2.1.0
*/
@Evolving
-public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object>
+public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object> implements Operator.ActivationListener<OperatorContext>
{
@NotNull
- private ArrayList<String> dataColumns;
- //These are extracted from table metadata
- private ArrayList<Integer> columnDataTypes;
-
- /*
- * An arraylist of data column names to be set in database.
- * Gets column names.
- */
- public ArrayList<String> getDataColumns()
- {
- return dataColumns;
- }
+ private List<FieldInfo> fieldInfos;
- public void setDataColumns(ArrayList<String> dataColumns)
- {
- this.dataColumns = dataColumns;
- }
+ private List<Integer> columnDataTypes;
@NotNull
private String tablename;
- /*
- * Gets the Tablename in database.
- */
- public String getTablename()
- {
- return tablename;
- }
+ private final transient List<JdbcPOJOInputOperator.ActiveFieldInfo> columnFieldGetters;
- public void setTablename(String tablename)
- {
- this.tablename = tablename;
- }
+ private String insertStatement;
- /*
- * An ArrayList of Java expressions that will yield the field value from the POJO.
- * Each expression corresponds to one column in the database table.
- */
- public ArrayList<String> getExpressions()
- {
- return expressions;
- }
+ private transient Class<?> pojoClass;
- public void setExpressions(ArrayList<String> expressions)
+ @InputPortFieldAnnotation(optional = true, schemaRequired = true)
+ public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
{
- this.expressions = expressions;
- }
+ @Override
+ public void setup(Context.PortContext context)
+ {
+ pojoClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
- @NotNull
- private ArrayList<String> expressions;
- private transient ArrayList<Object> getters;
- private String insertStatement;
+ @Override
+ public void process(Object t)
+ {
+ JdbcPOJOOutputOperator.super.input.process(t);
+ }
+
+ };
@Override
public void setup(OperatorContext context)
{
- StringBuilder columns = new StringBuilder("");
- StringBuilder values = new StringBuilder("");
- for (int i = 0; i < dataColumns.size(); i++) {
- columns.append(dataColumns.get(i));
+ StringBuilder columns = new StringBuilder();
+ StringBuilder values = new StringBuilder();
+ for (int i = 0; i < fieldInfos.size(); i++) {
+ columns.append(fieldInfos.get(i).getColumnName());
values.append("?");
- if (i < dataColumns.size() - 1) {
+ if (i < fieldInfos.size() - 1) {
columns.append(",");
values.append(",");
}
@@ -120,155 +105,227 @@ public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOpe
+ " (" + columns.toString() + ")"
+ " VALUES (" + values.toString() + ")";
LOG.debug("insert statement is {}", insertStatement);
+
super.setup(context);
- Connection conn = store.getConnection();
- LOG.debug("Got Connection.");
- try {
- Statement st = conn.createStatement();
- ResultSet rs = st.executeQuery("select * from " + tablename);
- ResultSetMetaData rsMetaData = rs.getMetaData();
+ if (columnDataTypes == null) {
+ try {
+ populateColumnDataTypes(columns.toString());
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
- int numberOfColumns = 0;
+ for (FieldInfo fi : fieldInfos) {
+ columnFieldGetters.add(new JdbcPOJOInputOperator.ActiveFieldInfo(fi));
+ }
+ }
- numberOfColumns = rsMetaData.getColumnCount();
+ protected void populateColumnDataTypes(String columns) throws SQLException
+ {
+ columnDataTypes = Lists.newArrayList();
+ try (Statement st = store.getConnection().createStatement()) {
+ ResultSet rs = st.executeQuery("select " + columns + " from " + tablename);
- LOG.debug("resultSet MetaData column Count=" + numberOfColumns);
+ ResultSetMetaData rsMetaData = rs.getMetaData();
+ LOG.debug("resultSet MetaData column count {}", rsMetaData.getColumnCount());
- for (int i = 1; i <= numberOfColumns; i++) {
- // get the designated column's SQL type.
+ for (int i = 1; i <= rsMetaData.getColumnCount(); i++) {
int type = rsMetaData.getColumnType(i);
- LOG.debug("column name {}", rsMetaData.getColumnTypeName(i));
columnDataTypes.add(type);
- LOG.debug("sql column type is " + type);
+ LOG.debug("column name {} type {}", rsMetaData.getColumnName(i), type);
}
}
- catch (SQLException ex) {
- throw new RuntimeException(ex);
- }
-
}
public JdbcPOJOOutputOperator()
{
super();
- columnDataTypes = new ArrayList<Integer>();
- getters = new ArrayList<Object>();
+ columnFieldGetters = Lists.newArrayList();
}
@Override
- public void processTuple(Object tuple)
+ protected String getUpdateCommand()
{
- if (getters.isEmpty()) {
- processFirstTuple(tuple);
- }
- super.processTuple(tuple);
+ LOG.debug("insert statement is {}", insertStatement);
+ return insertStatement;
}
- public void processFirstTuple(Object tuple)
+ @Override
+ @SuppressWarnings("unchecked")
+ protected void setStatementParameters(PreparedStatement statement, Object tuple) throws SQLException
{
- final Class<?> fqcn = tuple.getClass();
final int size = columnDataTypes.size();
for (int i = 0; i < size; i++) {
final int type = columnDataTypes.get(i);
- final String getterExpression = expressions.get(i);
- final Object getter;
+ JdbcPOJOInputOperator.ActiveFieldInfo activeFieldInfo = columnFieldGetters.get(i);
switch (type) {
- case Types.CHAR:
- getter = PojoUtils.createGetterChar(fqcn, getterExpression);
+ case (Types.CHAR):
+ case (Types.VARCHAR):
+ statement.setString(i + 1, ((Getter<Object, String>) activeFieldInfo.setterOrGetter).get(tuple));
break;
- case Types.VARCHAR:
- getter = PojoUtils.createGetter(fqcn, getterExpression, String.class);
+
+ case (Types.BOOLEAN):
+ statement.setBoolean(i + 1, ((GetterBoolean<Object>) activeFieldInfo.setterOrGetter).get(tuple));
break;
- case Types.BOOLEAN:
- case Types.TINYINT:
- getter = PojoUtils.createGetterBoolean(fqcn, getterExpression);
+
+ case (Types.TINYINT):
+ statement.setByte(i + 1, ((PojoUtils.GetterByte<Object>) activeFieldInfo.setterOrGetter).get(tuple));
break;
- case Types.SMALLINT:
- getter = PojoUtils.createGetterShort(fqcn, getterExpression);
+
+ case (Types.SMALLINT):
+ statement.setShort(i + 1, ((GetterShort<Object>) activeFieldInfo.setterOrGetter).get(tuple));
break;
- case Types.INTEGER:
- getter = PojoUtils.createGetterInt(fqcn, getterExpression);
+
+ case (Types.INTEGER):
+ statement.setInt(i + 1, ((GetterInt<Object>) activeFieldInfo.setterOrGetter).get(tuple));
break;
- case Types.BIGINT:
- getter = PojoUtils.createGetterLong(fqcn, getterExpression);
+
+ case (Types.BIGINT):
+ statement.setLong(i + 1, ((GetterLong<Object>) activeFieldInfo.setterOrGetter).get(tuple));
break;
- case Types.FLOAT:
- getter = PojoUtils.createGetterFloat(fqcn, getterExpression);
+
+ case (Types.FLOAT):
+ statement.setFloat(i + 1, ((GetterFloat<Object>) activeFieldInfo.setterOrGetter).get(tuple));
break;
- case Types.DOUBLE:
- getter = PojoUtils.createGetterDouble(fqcn, getterExpression);
+
+ case (Types.DOUBLE):
+ statement.setDouble(i + 1, ((GetterDouble<Object>) activeFieldInfo.setterOrGetter).get(tuple));
+ break;
+
+ case Types.DECIMAL:
+ statement.setBigDecimal(i + 1, ((Getter<Object, BigDecimal>) activeFieldInfo.setterOrGetter).get(tuple));
+ break;
+
+ case Types.TIMESTAMP:
+ statement.setTimestamp(i + 1, new Timestamp(((GetterLong<Object>) activeFieldInfo.setterOrGetter).get(tuple)));
+ break;
+
+ case Types.TIME:
+ statement.setTime(i + 1, new Time(((GetterLong<Object>) activeFieldInfo.setterOrGetter).get(tuple)));
break;
+
+ case Types.DATE:
+ statement.setDate(i + 1, new Date(((GetterLong<Object>) activeFieldInfo.setterOrGetter).get(tuple)));
+ break;
+
default:
- /*
- Types.DECIMAL
- Types.DATE
- Types.TIME
- Types.ARRAY
- Types.OTHER
- */
- getter = PojoUtils.createGetter(fqcn, getterExpression, Object.class);
+ handleUnknownDataType(type, tuple, activeFieldInfo);
break;
}
- getters.add(getter);
}
+ }
+ @SuppressWarnings("UnusedParameters")
+ protected void handleUnknownDataType(int type, Object tuple, JdbcPOJOInputOperator.ActiveFieldInfo activeFieldInfo)
+ {
+ throw new RuntimeException("unsupported data type " + type);
}
- @Override
- protected String getUpdateCommand()
+ /**
+ * A list of {@link FieldInfo}s where each item maps a column name to a pojo field name.
+ */
+ public List<FieldInfo> getFieldInfos()
{
- LOG.debug("insertstatement is {}", insertStatement);
- return insertStatement;
+ return fieldInfos;
+ }
+
+ /**
+ * Sets the {@link FieldInfo}s. A {@link FieldInfo} maps a store column to a pojo field name.<br/>
+ * The value from fieldInfo.column is assigned to fieldInfo.pojoFieldExpression.
+ *
+ * @description $[].columnName name of the database column name
+ * @description $[].pojoFieldExpression pojo field name or expression
+ * @useSchema $[].pojoFieldExpression input.fields[].name
+ */
+ public void setFieldInfos(List<FieldInfo> fieldInfos)
+ {
+ this.fieldInfos = fieldInfos;
+ }
+
+ /*
+ * Gets the name of the table in database.
+ */
+ public String getTablename()
+ {
+ return tablename;
+ }
+
+ public void setTablename(String tablename)
+ {
+ this.tablename = tablename;
}
+ private static final Logger LOG = LoggerFactory.getLogger(JdbcPOJOOutputOperator.class);
+
@Override
- @SuppressWarnings("unchecked")
- protected void setStatementParameters(PreparedStatement statement, Object tuple) throws SQLException
+ public void activate(OperatorContext context)
{
final int size = columnDataTypes.size();
for (int i = 0; i < size; i++) {
final int type = columnDataTypes.get(i);
+ JdbcPOJOInputOperator.ActiveFieldInfo activeFieldInfo = columnFieldGetters.get(i);
switch (type) {
case (Types.CHAR):
- statement.setString(i + 1, ((Getter<Object, String>)getters.get(i)).get(tuple));
- break;
case (Types.VARCHAR):
- statement.setString(i + 1, ((Getter<Object, String>)getters.get(i)).get(tuple));
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression(),
+ String.class);
break;
+
case (Types.BOOLEAN):
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterBoolean(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
case (Types.TINYINT):
- statement.setBoolean(i + 1, ((GetterBoolean<Object>)getters.get(i)).get(tuple));
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterByte(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
+
case (Types.SMALLINT):
- statement.setShort(i + 1, ((GetterShort<Object>)getters.get(i)).get(tuple));
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterShort(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
+
case (Types.INTEGER):
- statement.setInt(i + 1, ((GetterInt<Object>)getters.get(i)).get(tuple));
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterInt(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
+
case (Types.BIGINT):
- statement.setLong(i + 1, ((GetterLong<Object>)getters.get(i)).get(tuple));
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
+
case (Types.FLOAT):
- statement.setFloat(i + 1, ((GetterFloat<Object>)getters.get(i)).get(tuple));
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterFloat(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
+
case (Types.DOUBLE):
- statement.setDouble(i + 1, ((GetterDouble<Object>)getters.get(i)).get(tuple));
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterDouble(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ case Types.DECIMAL:
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression(),
+ BigDecimal.class);
+ break;
+
+ case Types.TIMESTAMP:
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ case Types.TIME:
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
+
+ case Types.DATE:
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
default:
- /*
- Types.DECIMAL
- Types.DATE
- Types.TIME
- Types.ARRAY
- Types.OTHER
- */
- statement.setObject(i + 1, ((Getter<Object, Object>)getters.get(i)).get(tuple));
+ handleUnknownDataType(type, null, activeFieldInfo);
break;
}
}
}
- private static final Logger LOG = LoggerFactory.getLogger(JdbcPOJOOutputOperator.class);
-
+ @Override
+ public void deactivate()
+ {
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/37438dea/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java
index 82e613a..4c4c004 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java
@@ -44,7 +44,7 @@ public class JdbcStore implements Connectable
private String databaseUrl;
@NotNull
private String databaseDriver;
- private final Properties connectionProperties;
+ private Properties connectionProperties;
protected transient Connection connection = null;
/*
@@ -115,10 +115,10 @@ public class JdbcStore implements Connectable
}
/**
- * Connection Properties for JDBC Connection.
- * Sets the properties on the jdbc connection.
+ * Sets the connection properties on JDBC connection. Connection properties are provided as a string.
+ *
* @param connectionProps Comma separated list of properties. Property key and value are separated by colon.
- * eg. user:xyz,password:ijk
+ * eg. user:xyz,password:ijk
*/
public void setConnectionProperties(String connectionProps)
{
@@ -131,8 +131,17 @@ public class JdbcStore implements Connectable
}
/**
- * Connection Properties for JDBC Connection.
- * Gets the properties on the jdbc connection.
+ * Sets the connection properties on JDBC connection.
+ *
+ * @param connectionProperties connection properties.
+ */
+ public void setConnectionProperties(Properties connectionProperties)
+ {
+ this.connectionProperties = connectionProperties;
+ }
+
+ /**
+ * Get the connection properties of JDBC connection.
*/
public Properties getConnectionProperties()
{
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/37438dea/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
index 4cca31f..d7f8690 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -26,12 +26,15 @@ import org.junit.Test;
import com.google.common.collect.Lists;
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
-import com.datatorrent.netlet.util.DTThrowable;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.helper.TestPortContext;
import com.datatorrent.lib.testbench.CollectorTestSink;
-import java.util.ArrayList;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.netlet.util.DTThrowable;
/**
* Tests for {@link AbstractJdbcTransactionableOutputOperator} and {@link AbstractJdbcInputOperator}
@@ -103,19 +106,18 @@ public class JdbcOperatorTest
Statement stmt = con.createStatement();
String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
- + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
- + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
- + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
- + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
- + ")";
+ + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
+ + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
+ + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
+ + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
+ + ")";
stmt.executeUpdate(createMetaTable);
String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + " (ID INTEGER)";
stmt.executeUpdate(createTable);
String createPOJOTable = "CREATE TABLE IF NOT EXISTS " + TABLE_POJO_NAME + "(id INTEGER not NULL,name VARCHAR(255), PRIMARY KEY ( id ))";
stmt.executeUpdate(createPOJOTable);
- }
- catch (Throwable e) {
+ } catch (Throwable e) {
DTThrowable.rethrow(e);
}
}
@@ -131,8 +133,23 @@ public class JdbcOperatorTest
cleanTable = "delete from " + JdbcTransactionalStore.DEFAULT_META_TABLE;
stmt.executeUpdate(cleanTable);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
}
- catch (SQLException e) {
+ }
+
+ public static void insertEventsInTable(int numEvents)
+ {
+ try {
+ Connection con = DriverManager.getConnection(URL);
+ String insert = "insert into " + TABLE_NAME + " values (?)";
+ PreparedStatement stmt = con.prepareStatement(insert);
+
+ for (int i = 0; i < numEvents; i++) {
+ stmt.setInt(1, i);
+ stmt.executeUpdate();
+ }
+ } catch (SQLException e) {
throw new RuntimeException(e);
}
}
@@ -170,8 +187,7 @@ public class JdbcOperatorTest
ResultSet resultSet = stmt.executeQuery(countQuery);
resultSet.next();
return resultSet.getInt(1);
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException("fetching count", e);
}
}
@@ -195,8 +211,7 @@ public class JdbcOperatorTest
ResultSet resultSet = stmt.executeQuery(countQuery);
resultSet.next();
return resultSet.getInt(1);
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException("fetching count", e);
}
}
@@ -218,8 +233,7 @@ public class JdbcOperatorTest
{
try {
return new TestEvent(result.getInt(1));
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException(e);
}
}
@@ -229,23 +243,6 @@ public class JdbcOperatorTest
{
return retrieveQuery;
}
-
- public void insertEventsInTable(int numEvents)
- {
- try {
- Connection con = DriverManager.getConnection(URL);
- String insert = "insert into " + TABLE_NAME + " values (?)";
- PreparedStatement stmt = con.prepareStatement(insert);
-
- for (int i = 0; i < numEvents; i++) {
- stmt.setInt(1, i);
- stmt.executeUpdate();
- }
- }
- catch (SQLException e) {
- throw new RuntimeException(e);
- }
- }
}
@Test
@@ -281,7 +278,7 @@ public class JdbcOperatorTest
}
@Test
- public void testJdbcPOJOOutputOperator()
+ public void testJdbcPojoOutputOperator()
{
JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore();
transactionalStore.setDatabaseDriver(DB_DRIVER);
@@ -294,25 +291,30 @@ public class JdbcOperatorTest
TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator();
outputOperator.setBatchSize(3);
outputOperator.setTablename(TABLE_POJO_NAME);
- ArrayList<String> dataColumns = new ArrayList<String>();
- dataColumns.add("id");
- dataColumns.add("name");
- outputOperator.setDataColumns(dataColumns);
+
+ List<FieldInfo> fieldInfos = Lists.newArrayList();
+ fieldInfos.add(new FieldInfo("ID", "id", null));
+ fieldInfos.add(new FieldInfo("NAME", "name", null));
+ outputOperator.setFieldInfos(fieldInfos);
+
outputOperator.setStore(transactionalStore);
- ArrayList<String> expressions = new ArrayList<String>();
- expressions.add("getId()");
- expressions.add("getName()");
- outputOperator.setExpressions(expressions);
outputOperator.setup(context);
+ Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+ TestPortContext tpc = new TestPortContext(portAttributes);
+ outputOperator.input.setup(tpc);
+
+ outputOperator.activate(context);
+
List<TestPOJOEvent> events = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
events.add(new TestPOJOEvent(i, "test" + i));
}
outputOperator.beginWindow(0);
- for (TestPOJOEvent event: events) {
+ for (TestPOJOEvent event : events) {
outputOperator.input.process(event);
}
outputOperator.endWindow();
@@ -321,7 +323,7 @@ public class JdbcOperatorTest
}
@Test
- public void TestJdbcInputOperator()
+ public void testJdbcInputOperator()
{
JdbcStore store = new JdbcStore();
store.setDatabaseDriver(DB_DRIVER);
@@ -333,9 +335,9 @@ public class JdbcOperatorTest
TestInputOperator inputOperator = new TestInputOperator();
inputOperator.setStore(store);
- inputOperator.insertEventsInTable(10);
+ insertEventsInTable(10);
- CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
+ CollectorTestSink<Object> sink = new CollectorTestSink<>();
inputOperator.outputPort.setSink(sink);
inputOperator.setup(context);
@@ -345,5 +347,73 @@ public class JdbcOperatorTest
Assert.assertEquals("rows from db", 10, sink.collectedTuples.size());
}
+
+ @Test
+ public void testJdbcPojoInputOperator()
+ {
+ JdbcStore store = new JdbcStore();
+ store.setDatabaseDriver(DB_DRIVER);
+ store.setDatabaseUrl(URL);
+
+ Attribute.AttributeMap.DefaultAttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+ OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
+
+ insertEventsInTable(10);
+
+ JdbcPOJOInputOperator inputOperator = new JdbcPOJOInputOperator();
+ inputOperator.setStore(store);
+ inputOperator.setTableName(TABLE_NAME);
+
+ List<FieldInfo> fieldInfos = Lists.newArrayList();
+ fieldInfos.add(new FieldInfo("ID", "id", null));
+ inputOperator.setFieldInfos(fieldInfos);
+
+ inputOperator.setFetchSize(5);
+
+ CollectorTestSink<Object> sink = new CollectorTestSink<>();
+ inputOperator.outputPort.setSink(sink);
+
+ Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+ TestPortContext tpc = new TestPortContext(portAttributes);
+
+ inputOperator.setup(context);
+ inputOperator.outputPort.setup(tpc);
+
+ inputOperator.activate(context);
+
+ inputOperator.beginWindow(0);
+ inputOperator.emitTuples();
+ inputOperator.endWindow();
+
+ Assert.assertEquals("rows from db", 5, sink.collectedTuples.size());
+ int i = 0;
+ for (Object tuple : sink.collectedTuples) {
+ TestPOJOEvent pojoEvent = (TestPOJOEvent)tuple;
+ Assert.assertTrue("i=" + i, pojoEvent.getId() == i);
+ i++;
+ }
+ sink.collectedTuples.clear();
+
+ inputOperator.beginWindow(1);
+ inputOperator.emitTuples();
+ inputOperator.endWindow();
+
+ Assert.assertEquals("rows from db", 5, sink.collectedTuples.size());
+ for (Object tuple : sink.collectedTuples) {
+ TestPOJOEvent pojoEvent = (TestPOJOEvent)tuple;
+ Assert.assertTrue("i=" + i, pojoEvent.getId() == i);
+ i++;
+ }
+
+ sink.collectedTuples.clear();
+
+ inputOperator.beginWindow(2);
+ inputOperator.emitTuples();
+ inputOperator.endWindow();
+
+ Assert.assertEquals("rows from db", 0, sink.collectedTuples.size());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/37438dea/library/src/test/java/com/datatorrent/lib/helper/TestPortContext.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/helper/TestPortContext.java b/library/src/test/java/com/datatorrent/lib/helper/TestPortContext.java
new file mode 100644
index 0000000..8bd72a6
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/helper/TestPortContext.java
@@ -0,0 +1,59 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.lib.helper;
+
+import java.util.Collection;
+
+import javax.validation.constraints.NotNull;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+
+public class TestPortContext implements Context.PortContext
+{
+ public final Attribute.AttributeMap attributeMap;
+
+ public TestPortContext(@NotNull Attribute.AttributeMap attributeMap)
+ {
+ this.attributeMap = Preconditions.checkNotNull(attributeMap, "attributes");
+ }
+
+ @Override
+ public Attribute.AttributeMap getAttributes()
+ {
+ return attributeMap;
+ }
+
+ @Override
+ public <T> T getValue(Attribute<T> key)
+ {
+ return attributeMap.get(key);
+ }
+
+ @Override
+ public void setCounters(Object counters)
+ {
+
+ }
+
+ @Override
+ public void sendMetrics(Collection<String> metricNames)
+ {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/37438dea/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1468163..c9f5397 100644
--- a/pom.xml
+++ b/pom.xml
@@ -80,6 +80,9 @@
<onlyBinaryIncompatible>false</onlyBinaryIncompatible>
<includeSynthetic>false</includeSynthetic>
<ignoreMissingClasses>true</ignoreMissingClasses>
+ <excludes>
+ <exclude>*POJO*</exclude>
+ </excludes>
</parameter>
<skip>${semver.plugin.skip}</skip>
</configuration>
[2/2] incubator-apex-malhar git commit: Merge branch
'jdbc-schemaIntegration' into devel-3
Posted by ti...@apache.org.
Merge branch 'jdbc-schemaIntegration' into devel-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/b5016527
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/b5016527
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/b5016527
Branch: refs/heads/devel-3
Commit: b5016527075135413491a5833da7b1485347c0a5
Parents: 59f21fb 37438de
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Thu Sep 10 13:19:03 2015 -0700
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Thu Sep 10 13:19:03 2015 -0700
----------------------------------------------------------------------
.../lib/db/jdbc/JdbcPOJOInputOperator.java | 620 +++++++++++++++++++
.../lib/db/jdbc/JdbcPOJOOutputOperator.java | 315 ++++++----
.../com/datatorrent/lib/db/jdbc/JdbcStore.java | 21 +-
.../lib/db/jdbc/JdbcOperatorTest.java | 164 +++--
.../datatorrent/lib/helper/TestPortContext.java | 59 ++
pom.xml | 3 +
6 files changed, 1000 insertions(+), 182 deletions(-)
----------------------------------------------------------------------