You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by sa...@apache.org on 2016/07/01 08:04:10 UTC
[2/3] apex-malhar git commit: APEXMALHAR-1953: Renaming
JdbcPOJOOutputOperator to AbstractJdbcPOJOOutputOperator
APEXMALHAR-1953: Renaming JdbcPOJOOutputOperator to AbstractJdbcPOJOOutputOperator
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/f6ba2d03
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/f6ba2d03
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/f6ba2d03
Branch: refs/heads/master
Commit: f6ba2d037711063f60175a552fd6ae54b5e59623
Parents: 13883da
Author: bhupesh <bh...@gmail.com>
Authored: Mon Mar 14 18:45:07 2016 +0530
Committer: bhupesh <bh...@gmail.com>
Committed: Fri Jul 1 11:53:02 2016 +0530
----------------------------------------------------------------------
.../db/jdbc/AbstractJdbcPOJOOutputOperator.java | 352 +++++++++++++++++++
.../lib/db/jdbc/JdbcPOJOOutputOperator.java | 352 -------------------
.../lib/db/jdbc/JdbcOperatorTest.java | 2 +-
3 files changed, 353 insertions(+), 353 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f6ba2d03/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java
new file mode 100644
index 0000000..da491aa
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java
@@ -0,0 +1,352 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Getter;
+import com.datatorrent.lib.util.PojoUtils.GetterBoolean;
+import com.datatorrent.lib.util.PojoUtils.GetterDouble;
+import com.datatorrent.lib.util.PojoUtils.GetterFloat;
+import com.datatorrent.lib.util.PojoUtils.GetterInt;
+import com.datatorrent.lib.util.PojoUtils.GetterLong;
+import com.datatorrent.lib.util.PojoUtils.GetterShort;
+
+/**
+ * <p>
+ * JdbcPOJOOutputOperator class.</p>
+ * A Generic implementation of AbstractJdbcTransactionableOutputOperator which takes in any POJO.
+ *
+ * @displayName Jdbc Output Operator
+ * @category Output
+ * @tags database, sql, pojo, jdbc
+ * @since 2.1.0
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object>
+ implements Operator.ActivationListener<OperatorContext>
+{
+ @NotNull
+ private List<FieldInfo> fieldInfos;
+
+ private List<Integer> columnDataTypes;
+
+ @NotNull
+ private String tablename;
+
+ private final transient List<JdbcPOJOInputOperator.ActiveFieldInfo> columnFieldGetters;
+
+ private String insertStatement;
+
+ private transient Class<?> pojoClass;
+
+ @InputPortFieldAnnotation(optional = true, schemaRequired = true)
+ public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
+ {
+ @Override
+ public void setup(Context.PortContext context)
+ {
+ pojoClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
+
+ @Override
+ public void process(Object t)
+ {
+ AbstractJdbcPOJOOutputOperator.super.input.process(t);
+ }
+
+ };
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ StringBuilder columns = new StringBuilder();
+ StringBuilder values = new StringBuilder();
+ for (int i = 0; i < fieldInfos.size(); i++) {
+ columns.append(fieldInfos.get(i).getColumnName());
+ values.append("?");
+ if (i < fieldInfos.size() - 1) {
+ columns.append(",");
+ values.append(",");
+ }
+ }
+ insertStatement = "INSERT INTO "
+ + tablename
+ + " (" + columns.toString() + ")"
+ + " VALUES (" + values.toString() + ")";
+ LOG.debug("insert statement is {}", insertStatement);
+
+ super.setup(context);
+
+ if (columnDataTypes == null) {
+ try {
+ populateColumnDataTypes(columns.toString());
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ for (FieldInfo fi : fieldInfos) {
+ columnFieldGetters.add(new JdbcPOJOInputOperator.ActiveFieldInfo(fi));
+ }
+ }
+
+ protected void populateColumnDataTypes(String columns) throws SQLException
+ {
+ columnDataTypes = Lists.newArrayList();
+ try (Statement st = store.getConnection().createStatement()) {
+ ResultSet rs = st.executeQuery("select " + columns + " from " + tablename);
+
+ ResultSetMetaData rsMetaData = rs.getMetaData();
+ LOG.debug("resultSet MetaData column count {}", rsMetaData.getColumnCount());
+
+ for (int i = 1; i <= rsMetaData.getColumnCount(); i++) {
+ int type = rsMetaData.getColumnType(i);
+ columnDataTypes.add(type);
+ LOG.debug("column name {} type {}", rsMetaData.getColumnName(i), type);
+ }
+ }
+ }
+
+ public AbstractJdbcPOJOOutputOperator()
+ {
+ super();
+ columnFieldGetters = Lists.newArrayList();
+ }
+
+ @Override
+ protected String getUpdateCommand()
+ {
+ LOG.debug("insert statement is {}", insertStatement);
+ return insertStatement;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected void setStatementParameters(PreparedStatement statement, Object tuple) throws SQLException
+ {
+ final int size = columnDataTypes.size();
+ for (int i = 0; i < size; i++) {
+ final int type = columnDataTypes.get(i);
+ JdbcPOJOInputOperator.ActiveFieldInfo activeFieldInfo = columnFieldGetters.get(i);
+ switch (type) {
+ case (Types.CHAR):
+ case (Types.VARCHAR):
+ statement.setString(i + 1, ((Getter<Object, String>)activeFieldInfo.setterOrGetter).get(tuple));
+ break;
+
+ case (Types.BOOLEAN):
+ statement.setBoolean(i + 1, ((GetterBoolean<Object>)activeFieldInfo.setterOrGetter).get(tuple));
+ break;
+
+ case (Types.TINYINT):
+ statement.setByte(i + 1, ((PojoUtils.GetterByte<Object>)activeFieldInfo.setterOrGetter).get(tuple));
+ break;
+
+ case (Types.SMALLINT):
+ statement.setShort(i + 1, ((GetterShort<Object>)activeFieldInfo.setterOrGetter).get(tuple));
+ break;
+
+ case (Types.INTEGER):
+ statement.setInt(i + 1, ((GetterInt<Object>)activeFieldInfo.setterOrGetter).get(tuple));
+ break;
+
+ case (Types.BIGINT):
+ statement.setLong(i + 1, ((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple));
+ break;
+
+ case (Types.FLOAT):
+ statement.setFloat(i + 1, ((GetterFloat<Object>)activeFieldInfo.setterOrGetter).get(tuple));
+ break;
+
+ case (Types.DOUBLE):
+ statement.setDouble(i + 1, ((GetterDouble<Object>)activeFieldInfo.setterOrGetter).get(tuple));
+ break;
+
+ case Types.DECIMAL:
+ statement.setBigDecimal(i + 1, ((Getter<Object, BigDecimal>)activeFieldInfo.setterOrGetter).get(tuple));
+ break;
+
+ case Types.TIMESTAMP:
+ statement.setTimestamp(i + 1, new Timestamp(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple)));
+ break;
+
+ case Types.TIME:
+ statement.setTime(i + 1, new Time(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple)));
+ break;
+
+ case Types.DATE:
+ statement.setDate(i + 1, new Date(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple)));
+ break;
+
+ default:
+ handleUnknownDataType(type, tuple, activeFieldInfo);
+ break;
+ }
+ }
+ }
+
+ @SuppressWarnings("UnusedParameters")
+ protected void handleUnknownDataType(int type, Object tuple, JdbcPOJOInputOperator.ActiveFieldInfo activeFieldInfo)
+ {
+ throw new RuntimeException("unsupported data type " + type);
+ }
+
+ /**
+ * A list of {@link FieldInfo}s where each item maps a column name to a pojo field name.
+ */
+ public List<FieldInfo> getFieldInfos()
+ {
+ return fieldInfos;
+ }
+
+ /**
+ * Sets the {@link FieldInfo}s. A {@link FieldInfo} maps a store column to a pojo field name.<br/>
+ * The value from fieldInfo.column is assigned to fieldInfo.pojoFieldExpression.
+ *
+ * @description $[].columnName name of the database column name
+ * @description $[].pojoFieldExpression pojo field name or expression
+ * @useSchema $[].pojoFieldExpression input.fields[].name
+ */
+ public void setFieldInfos(List<FieldInfo> fieldInfos)
+ {
+ this.fieldInfos = fieldInfos;
+ }
+
+ /*
+ * Gets the name of the table in database.
+ */
+ public String getTablename()
+ {
+ return tablename;
+ }
+
+ public void setTablename(String tablename)
+ {
+ this.tablename = tablename;
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcPOJOOutputOperator.class);
+
+ @Override
+ public void activate(OperatorContext context)
+ {
+ final int size = columnDataTypes.size();
+ for (int i = 0; i < size; i++) {
+ final int type = columnDataTypes.get(i);
+ JdbcPOJOInputOperator.ActiveFieldInfo activeFieldInfo = columnFieldGetters.get(i);
+ switch (type) {
+ case (Types.CHAR):
+ case (Types.VARCHAR):
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression(),
+ String.class);
+ break;
+
+ case (Types.BOOLEAN):
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterBoolean(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ case (Types.TINYINT):
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterByte(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ case (Types.SMALLINT):
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterShort(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ case (Types.INTEGER):
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterInt(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ case (Types.BIGINT):
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ case (Types.FLOAT):
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterFloat(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ case (Types.DOUBLE):
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterDouble(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ case Types.DECIMAL:
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression(), BigDecimal.class);
+ break;
+
+ case Types.TIMESTAMP:
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ case Types.TIME:
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ case Types.DATE:
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ break;
+
+ default:
+ handleUnknownDataType(type, null, activeFieldInfo);
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void deactivate()
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f6ba2d03/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java
deleted file mode 100644
index a7dcb2a..0000000
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java
+++ /dev/null
@@ -1,352 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.db.jdbc;
-
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.sql.Types;
-import java.util.List;
-
-import javax.validation.constraints.NotNull;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-import com.datatorrent.lib.util.FieldInfo;
-import com.datatorrent.lib.util.PojoUtils;
-import com.datatorrent.lib.util.PojoUtils.Getter;
-import com.datatorrent.lib.util.PojoUtils.GetterBoolean;
-import com.datatorrent.lib.util.PojoUtils.GetterDouble;
-import com.datatorrent.lib.util.PojoUtils.GetterFloat;
-import com.datatorrent.lib.util.PojoUtils.GetterInt;
-import com.datatorrent.lib.util.PojoUtils.GetterLong;
-import com.datatorrent.lib.util.PojoUtils.GetterShort;
-
-/**
- * <p>
- * JdbcPOJOOutputOperator class.</p>
- * A Generic implementation of AbstractJdbcTransactionableOutputOperator which takes in any POJO.
- *
- * @displayName Jdbc Output Operator
- * @category Output
- * @tags database, sql, pojo, jdbc
- * @since 2.1.0
- */
-@org.apache.hadoop.classification.InterfaceStability.Evolving
-public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object>
- implements Operator.ActivationListener<OperatorContext>
-{
- @NotNull
- private List<FieldInfo> fieldInfos;
-
- private List<Integer> columnDataTypes;
-
- @NotNull
- private String tablename;
-
- private final transient List<JdbcPOJOInputOperator.ActiveFieldInfo> columnFieldGetters;
-
- private String insertStatement;
-
- private transient Class<?> pojoClass;
-
- @InputPortFieldAnnotation(optional = true, schemaRequired = true)
- public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
- {
- @Override
- public void setup(Context.PortContext context)
- {
- pojoClass = context.getValue(Context.PortContext.TUPLE_CLASS);
- }
-
- @Override
- public void process(Object t)
- {
- JdbcPOJOOutputOperator.super.input.process(t);
- }
-
- };
-
- @Override
- public void setup(OperatorContext context)
- {
- StringBuilder columns = new StringBuilder();
- StringBuilder values = new StringBuilder();
- for (int i = 0; i < fieldInfos.size(); i++) {
- columns.append(fieldInfos.get(i).getColumnName());
- values.append("?");
- if (i < fieldInfos.size() - 1) {
- columns.append(",");
- values.append(",");
- }
- }
- insertStatement = "INSERT INTO "
- + tablename
- + " (" + columns.toString() + ")"
- + " VALUES (" + values.toString() + ")";
- LOG.debug("insert statement is {}", insertStatement);
-
- super.setup(context);
-
- if (columnDataTypes == null) {
- try {
- populateColumnDataTypes(columns.toString());
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
- }
-
- for (FieldInfo fi : fieldInfos) {
- columnFieldGetters.add(new JdbcPOJOInputOperator.ActiveFieldInfo(fi));
- }
- }
-
- protected void populateColumnDataTypes(String columns) throws SQLException
- {
- columnDataTypes = Lists.newArrayList();
- try (Statement st = store.getConnection().createStatement()) {
- ResultSet rs = st.executeQuery("select " + columns + " from " + tablename);
-
- ResultSetMetaData rsMetaData = rs.getMetaData();
- LOG.debug("resultSet MetaData column count {}", rsMetaData.getColumnCount());
-
- for (int i = 1; i <= rsMetaData.getColumnCount(); i++) {
- int type = rsMetaData.getColumnType(i);
- columnDataTypes.add(type);
- LOG.debug("column name {} type {}", rsMetaData.getColumnName(i), type);
- }
- }
- }
-
- public JdbcPOJOOutputOperator()
- {
- super();
- columnFieldGetters = Lists.newArrayList();
- }
-
- @Override
- protected String getUpdateCommand()
- {
- LOG.debug("insert statement is {}", insertStatement);
- return insertStatement;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- protected void setStatementParameters(PreparedStatement statement, Object tuple) throws SQLException
- {
- final int size = columnDataTypes.size();
- for (int i = 0; i < size; i++) {
- final int type = columnDataTypes.get(i);
- JdbcPOJOInputOperator.ActiveFieldInfo activeFieldInfo = columnFieldGetters.get(i);
- switch (type) {
- case (Types.CHAR):
- case (Types.VARCHAR):
- statement.setString(i + 1, ((Getter<Object, String>)activeFieldInfo.setterOrGetter).get(tuple));
- break;
-
- case (Types.BOOLEAN):
- statement.setBoolean(i + 1, ((GetterBoolean<Object>)activeFieldInfo.setterOrGetter).get(tuple));
- break;
-
- case (Types.TINYINT):
- statement.setByte(i + 1, ((PojoUtils.GetterByte<Object>)activeFieldInfo.setterOrGetter).get(tuple));
- break;
-
- case (Types.SMALLINT):
- statement.setShort(i + 1, ((GetterShort<Object>)activeFieldInfo.setterOrGetter).get(tuple));
- break;
-
- case (Types.INTEGER):
- statement.setInt(i + 1, ((GetterInt<Object>)activeFieldInfo.setterOrGetter).get(tuple));
- break;
-
- case (Types.BIGINT):
- statement.setLong(i + 1, ((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple));
- break;
-
- case (Types.FLOAT):
- statement.setFloat(i + 1, ((GetterFloat<Object>)activeFieldInfo.setterOrGetter).get(tuple));
- break;
-
- case (Types.DOUBLE):
- statement.setDouble(i + 1, ((GetterDouble<Object>)activeFieldInfo.setterOrGetter).get(tuple));
- break;
-
- case Types.DECIMAL:
- statement.setBigDecimal(i + 1, ((Getter<Object, BigDecimal>)activeFieldInfo.setterOrGetter).get(tuple));
- break;
-
- case Types.TIMESTAMP:
- statement.setTimestamp(i + 1, new Timestamp(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple)));
- break;
-
- case Types.TIME:
- statement.setTime(i + 1, new Time(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple)));
- break;
-
- case Types.DATE:
- statement.setDate(i + 1, new Date(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple)));
- break;
-
- default:
- handleUnknownDataType(type, tuple, activeFieldInfo);
- break;
- }
- }
- }
-
- @SuppressWarnings("UnusedParameters")
- protected void handleUnknownDataType(int type, Object tuple, JdbcPOJOInputOperator.ActiveFieldInfo activeFieldInfo)
- {
- throw new RuntimeException("unsupported data type " + type);
- }
-
- /**
- * A list of {@link FieldInfo}s where each item maps a column name to a pojo field name.
- */
- public List<FieldInfo> getFieldInfos()
- {
- return fieldInfos;
- }
-
- /**
- * Sets the {@link FieldInfo}s. A {@link FieldInfo} maps a store column to a pojo field name.<br/>
- * The value from fieldInfo.column is assigned to fieldInfo.pojoFieldExpression.
- *
- * @description $[].columnName name of the database column name
- * @description $[].pojoFieldExpression pojo field name or expression
- * @useSchema $[].pojoFieldExpression input.fields[].name
- */
- public void setFieldInfos(List<FieldInfo> fieldInfos)
- {
- this.fieldInfos = fieldInfos;
- }
-
- /*
- * Gets the name of the table in database.
- */
- public String getTablename()
- {
- return tablename;
- }
-
- public void setTablename(String tablename)
- {
- this.tablename = tablename;
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(JdbcPOJOOutputOperator.class);
-
- @Override
- public void activate(OperatorContext context)
- {
- final int size = columnDataTypes.size();
- for (int i = 0; i < size; i++) {
- final int type = columnDataTypes.get(i);
- JdbcPOJOInputOperator.ActiveFieldInfo activeFieldInfo = columnFieldGetters.get(i);
- switch (type) {
- case (Types.CHAR):
- case (Types.VARCHAR):
- activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass,
- activeFieldInfo.fieldInfo.getPojoFieldExpression(),
- String.class);
- break;
-
- case (Types.BOOLEAN):
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterBoolean(pojoClass,
- activeFieldInfo.fieldInfo.getPojoFieldExpression());
- break;
-
- case (Types.TINYINT):
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterByte(pojoClass,
- activeFieldInfo.fieldInfo.getPojoFieldExpression());
- break;
-
- case (Types.SMALLINT):
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterShort(pojoClass,
- activeFieldInfo.fieldInfo.getPojoFieldExpression());
- break;
-
- case (Types.INTEGER):
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterInt(pojoClass,
- activeFieldInfo.fieldInfo.getPojoFieldExpression());
- break;
-
- case (Types.BIGINT):
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
- activeFieldInfo.fieldInfo.getPojoFieldExpression());
- break;
-
- case (Types.FLOAT):
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterFloat(pojoClass,
- activeFieldInfo.fieldInfo.getPojoFieldExpression());
- break;
-
- case (Types.DOUBLE):
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterDouble(pojoClass,
- activeFieldInfo.fieldInfo.getPojoFieldExpression());
- break;
-
- case Types.DECIMAL:
- activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass,
- activeFieldInfo.fieldInfo.getPojoFieldExpression(), BigDecimal.class);
- break;
-
- case Types.TIMESTAMP:
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
- activeFieldInfo.fieldInfo.getPojoFieldExpression());
- break;
-
- case Types.TIME:
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
- activeFieldInfo.fieldInfo.getPojoFieldExpression());
- break;
-
- case Types.DATE:
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
- activeFieldInfo.fieldInfo.getPojoFieldExpression());
- break;
-
- default:
- handleUnknownDataType(type, null, activeFieldInfo);
- break;
- }
- }
- }
-
- @Override
- public void deactivate()
- {
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f6ba2d03/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
index 6f2688f..ad7e676 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
@@ -238,7 +238,7 @@ public class JdbcOperatorTest
}
}
- private static class TestPOJOOutputOperator extends JdbcPOJOOutputOperator
+ private static class TestPOJOOutputOperator extends AbstractJdbcPOJOOutputOperator
{
TestPOJOOutputOperator()
{