You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by bh...@apache.org on 2016/11/24 12:08:00 UTC
apex-malhar git commit: APEXMALHAR-2340 code changes to initialize
the list of JdbcFieldInfo in JdbcPOJOInsertOutput from properties.xml
Repository: apex-malhar
Updated Branches:
refs/heads/master f617d5e35 -> e01cf9c44
APEXMALHAR-2340 code changes to initialize the list of JdbcFieldInfo in JdbcPOJOInsertOutput from properties.xml
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/e01cf9c4
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/e01cf9c4
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/e01cf9c4
Branch: refs/heads/master
Commit: e01cf9c447a34791e6de5efb160f91ac20a87df9
Parents: f617d5e
Author: Hitesh-Scorpio <fo...@gmail.com>
Authored: Thu Nov 17 10:49:20 2016 +0530
Committer: Hitesh-Scorpio <fo...@gmail.com>
Committed: Thu Nov 24 16:24:53 2016 +0530
----------------------------------------------------------------------
.../db/jdbc/AbstractJdbcPOJOOutputOperator.java | 27 +++-
.../db/jdbc/JdbcPOJOInsertOutputOperator.java | 4 +-
.../jdbc/JdbcPojoOperatorApplicationTest.java | 129 +++++++++++++++++++
library/src/test/resources/JdbcProperties.xml | 53 ++++++++
4 files changed, 209 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e01cf9c4/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java
index 38d44a0..99b14da 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java
@@ -25,10 +25,12 @@ import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
+import java.util.ArrayList;
import java.util.List;
import javax.validation.constraints.NotNull;
+import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,8 +63,7 @@ import com.datatorrent.lib.util.PojoUtils.GetterShort;
@org.apache.hadoop.classification.InterfaceStability.Evolving
public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object>
{
- private List<JdbcFieldInfo> fieldInfos;
-
+ private List<JdbcFieldInfo> fieldInfos = new ArrayList<>();
protected List<Integer> columnDataTypes;
@NotNull
@@ -295,4 +296,26 @@ public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransac
}
}
+ /**
+ * Function to initialize the list of {@link JdbcFieldInfo} from properties.xml
+ * @param index
+ * @param value
+ */
+ public void setFieldInfosItem(int index, String value)
+ {
+ try {
+ JSONObject jo = new JSONObject(value);
+ JdbcFieldInfo jdbcFieldInfo = new JdbcFieldInfo(jo.getString("columnName"), jo.getString("pojoFieldExpression"),
+ FieldInfo.SupportType.valueOf(jo.getString("type")), jo.getInt("sqlType"));
+ final int need = index - fieldInfos.size() + 1;
+ for (int i = 0; i < need; i++) {
+ fieldInfos.add(null);
+ }
+ fieldInfos.set(index,jdbcFieldInfo);
+ } catch (Exception e) {
+ throw new RuntimeException("Exception in setting JdbcFieldInfo");
+ }
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e01cf9c4/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java
index 706757a..8fe20fe 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java
@@ -67,7 +67,7 @@ public class JdbcPOJOInsertOutputOperator extends AbstractJdbcPOJOOutputOperator
* columnNamesSet is the set having column names given by the user
*/
HashSet<String> columnNamesSet = new HashSet<>();
- if (getFieldInfos() == null) { // then assume direct mapping
+ if (getFieldInfos() == null || getFieldInfos().size() == 0) { // then assume direct mapping
LOG.info("FieldInfo missing. Assuming direct mapping between POJO fields and DB columns");
} else {
// FieldInfo supplied by user
@@ -93,7 +93,7 @@ public class JdbcPOJOInsertOutputOperator extends AbstractJdbcPOJOOutputOperator
@Override
public void activate(OperatorContext context)
{
- if (getFieldInfos() == null) {
+ if (getFieldInfos() == null || getFieldInfos().size() == 0) {
Field[] fields = pojoClass.getDeclaredFields();
// Create fieldInfos in case of direct mapping
List<JdbcFieldInfo> fieldInfos = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e01cf9c4/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorApplicationTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorApplicationTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorApplicationTest.java
new file mode 100644
index 0000000..fe43327
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorApplicationTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.Callable;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.stram.StramLocalCluster;
+
+public class JdbcPojoOperatorApplicationTest extends JdbcOperatorTest
+{
+ public static int TupleCount;
+ public static com.datatorrent.lib.parser.XmlParserTest.EmployeeBean obj;
+
+ public int getNumOfRowsinTable(String tableName)
+ {
+ Connection con;
+ try {
+ con = DriverManager.getConnection(URL);
+ Statement stmt = con.createStatement();
+ String countQuery = "SELECT count(*) from " + tableName;
+ ResultSet resultSet = stmt.executeQuery(countQuery);
+ resultSet.next();
+ return resultSet.getInt(1);
+ } catch (SQLException e) {
+ throw new RuntimeException("fetching count", e);
+ }
+ }
+
+ @Test
+ public void testApplication() throws Exception
+ {
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.addResource(this.getClass().getResourceAsStream("/JdbcProperties.xml"));
+ lma.prepareDAG(new JdbcPojoOperatorApplication(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.setHeartbeatMonitoringEnabled(false);
+ ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return TupleCount == 10;
+ }
+ });
+ lc.run(10000);// runs for 10 seconds and quits
+ Assert.assertEquals("rows in db", 10, getNumOfRowsinTable(TABLE_POJO_NAME));
+ } catch (ConstraintViolationException e) {
+ Assert.fail("constraint violations: " + e.getConstraintViolations());
+ }
+ }
+
+ public static class JdbcPojoOperatorApplication implements StreamingApplication
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration configuration)
+ {
+ JdbcPOJOInsertOutputOperator jdbc = dag.addOperator("JdbcOutput", new JdbcPOJOInsertOutputOperator());
+ JdbcTransactionalStore outputStore = new JdbcTransactionalStore();
+ outputStore.setDatabaseDriver(DB_DRIVER);
+ outputStore.setDatabaseUrl(URL);
+ jdbc.setStore(outputStore);
+ jdbc.setBatchSize(3);
+ jdbc.setTablename(TABLE_POJO_NAME);
+ dag.getMeta(jdbc).getMeta(jdbc.input).getAttributes().put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+ cleanTable();
+ JdbcPojoEmitterOperator input = dag.addOperator("data", new JdbcPojoEmitterOperator());
+ dag.addStream("pojo", input.output, jdbc.input);
+ }
+ }
+
+ public static class JdbcPojoEmitterOperator extends BaseOperator implements InputOperator
+ {
+ public static int emitTuple = 10;
+ public final transient DefaultOutputPort<TestPOJOEvent> output = new DefaultOutputPort<TestPOJOEvent>();
+
+ @Override
+ public void emitTuples()
+ {
+ if (emitTuple > 0) {
+ output.emit(new TestPOJOEvent(emitTuple,"test" + emitTuple));
+ emitTuple--;
+ TupleCount++;
+ }
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ TupleCount = 0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e01cf9c4/library/src/test/resources/JdbcProperties.xml
----------------------------------------------------------------------
diff --git a/library/src/test/resources/JdbcProperties.xml b/library/src/test/resources/JdbcProperties.xml
new file mode 100644
index 0000000..3e76cf7
--- /dev/null
+++ b/library/src/test/resources/JdbcProperties.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration>
+
+ <property>
+ <name>dt.operator.JdbcOutput.fieldInfosItem[0]</name>
+ <value>
+ {
+ "sqlType": 0,
+ "columnName":"ID",
+ "pojoFieldExpression": "id",
+ "type":"INTEGER"
+ }
+
+ </value>
+ </property>
+
+
+ <property>
+ <name>dt.operator.JdbcOutput.fieldInfosItem[1]</name>
+ <value>
+ {
+ "sqlType": 0,
+ "columnName":"NAME",
+ "pojoFieldExpression": "name",
+ "type":"STRING"
+ }
+
+ </value>
+ </property>
+
+
+
+</configuration>