You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by bh...@apache.org on 2016/12/01 12:44:06 UTC
apex-malhar git commit: APEXMALHAR-2344 code changes to initialize
the list of FieldInfo from properties.xml
Repository: apex-malhar
Updated Branches:
refs/heads/master a5e8fa3fa -> ee77dc654
APEXMALHAR-2344 code changes to initialize the list of FieldInfo from properties.xml
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/ee77dc65
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/ee77dc65
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/ee77dc65
Branch: refs/heads/master
Commit: ee77dc654692c6a780c9f54a98f838c5f14e6527
Parents: a5e8fa3
Author: Hitesh-Scorpio <fo...@gmail.com>
Authored: Mon Nov 28 19:48:31 2016 +0530
Committer: Hitesh-Scorpio <fo...@gmail.com>
Committed: Thu Dec 1 12:21:58 2016 +0530
----------------------------------------------------------------------
.../lib/db/jdbc/JdbcPOJOInputOperator.java | 36 +++-
.../lib/db/jdbc/JdbcPOJOPollInputOperator.java | 37 +++-
.../jdbc/JdbcInputOperatorApplicationTest.java | 167 +++++++++++++++++++
library/src/test/resources/JdbcProperties.xml | 131 +++++++++++++++
4 files changed, 369 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ee77dc65/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
index 59c2807..e587d85 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
@@ -27,12 +27,14 @@ import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
+import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,7 +82,7 @@ public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object>
private boolean mysqlSyntax;
@NotNull
- private List<FieldInfo> fieldInfos;
+ private List<FieldInfo> fieldInfos = new ArrayList<>();;
@Min(1)
private int fetchSize;
@@ -632,5 +634,37 @@ public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object>
this.mysqlSyntax = mysqlSyntax;
}
+ /**
+ * Function to initialize the list of {@link FieldInfo} externally from configuration/properties file.
+ * Example entry in the properties/configuration file:
+ <property>
+ <name>dt.operator.JdbcPOJOInput.fieldInfosItem[0]</name>
+ <value>
+ {
+ "columnName":"ID",
+ "pojoFieldExpression": "id",
+ "type":"INTEGER"
+ }
+ </value>
+ </property>
+ * @param index is the index in the list which is to be initialized.
+ * @param value is the JSON String with appropriate mappings for {@link FieldInfo}.
+ */
+ public void setFieldInfosItem(int index, String value)
+ {
+ try {
+ JSONObject jo = new JSONObject(value);
+ FieldInfo fieldInfo = new FieldInfo(jo.getString("columnName"), jo.getString("pojoFieldExpression"),
+ FieldInfo.SupportType.valueOf(jo.getString("type")));
+ final int need = index - fieldInfos.size() + 1;
+ for (int i = 0; i < need; i++) {
+ fieldInfos.add(null);
+ }
+ fieldInfos.set(index,fieldInfo);
+ } catch (Exception e) {
+ throw new RuntimeException("Exception in setting FieldInfo " + value + " " + e.getMessage());
+ }
+ }
+
public static final Logger LOG = LoggerFactory.getLogger(JdbcPOJOInputOperator.class);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ee77dc65/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
index 62618de..129981a 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
@@ -27,11 +27,13 @@ import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.validation.constraints.NotNull;
+import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,7 +67,7 @@ public class JdbcPOJOPollInputOperator extends AbstractJdbcPollInputOperator<Obj
protected List<Integer> columnDataTypes;
protected transient Class<?> pojoClass;
@NotNull
- private List<FieldInfo> fieldInfos;
+ private List<FieldInfo> fieldInfos = new ArrayList<>();
@OutputPortFieldAnnotation(schemaRequired = true)
public final transient DefaultOutputPort<Object> outputPort = new DefaultOutputPort<Object>()
@@ -323,5 +325,38 @@ public class JdbcPOJOPollInputOperator extends AbstractJdbcPollInputOperator<Obj
this.fieldInfos = fieldInfos;
}
+ /**
+ * Function to initialize the list of {@link FieldInfo} externally from configuration/properties file.
+ * Example entry in the properties/configuration file:
+ <property>
+ <name>dt.operator.JdbcPOJOInput.fieldInfosItem[0]</name>
+ <value>
+ {
+ "columnName":"ID",
+ "pojoFieldExpression": "id",
+ "type":"INTEGER"
+ }
+ </value>
+ </property>
+ * @param index is the index in the list which is to be initialized.
+ * @param value is the JSON String with appropriate mappings for {@link FieldInfo}.
+ */
+ public void setFieldInfosItem(int index, String value)
+ {
+
+ try {
+ JSONObject jo = new JSONObject(value);
+ FieldInfo fieldInfo = new FieldInfo(jo.getString("columnName"), jo.getString("pojoFieldExpression"),
+ FieldInfo.SupportType.valueOf(jo.getString("type")));
+ final int need = index - fieldInfos.size() + 1;
+ for (int i = 0; i < need; i++) {
+ fieldInfos.add(null);
+ }
+ fieldInfos.set(index,fieldInfo);
+ } catch (Exception e) {
+ throw new RuntimeException("Exception in setting FieldInfo " + value + " " + e.getMessage());
+ }
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(JdbcPOJOPollInputOperator.class);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ee77dc65/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcInputOperatorApplicationTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcInputOperatorApplicationTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcInputOperatorApplicationTest.java
new file mode 100644
index 0000000..c59978a
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcInputOperatorApplicationTest.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.Callable;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Tests to check if setting List of {@link FieldInfo} externally from
+ * configuration file works fine for Jdbc input operators.
+ */
+public class JdbcInputOperatorApplicationTest extends JdbcOperatorTest
+{
+ public static int TupleCount;
+
+ public int getNumOfRowsinTable(String tableName)
+ {
+ Connection con;
+ try {
+ con = DriverManager.getConnection(URL);
+ Statement stmt = con.createStatement();
+ String countQuery = "SELECT count(*) from " + tableName;
+ ResultSet resultSet = stmt.executeQuery(countQuery);
+ resultSet.next();
+ return resultSet.getInt(1);
+ } catch (SQLException e) {
+ throw new RuntimeException("fetching count", e);
+ }
+ }
+
+ public void testApplication(StreamingApplication streamingApplication) throws Exception
+ {
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.addResource(this.getClass().getResourceAsStream("/JdbcProperties.xml"));
+ lma.prepareDAG(streamingApplication, conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.setHeartbeatMonitoringEnabled(false);
+ ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return TupleCount == 10;
+ }
+ });
+ lc.run(10000);// runs for 10 seconds and quits
+ Assert.assertEquals("rows in db", TupleCount, getNumOfRowsinTable(TABLE_POJO_NAME));
+ } catch (ConstraintViolationException e) {
+ Assert.fail("constraint violations: " + e.getConstraintViolations());
+ }
+
+ }
+
+ @Test
+ public void testJdbcPOJOPollInputOperatorApplication() throws Exception
+ {
+ testApplication(new JdbcPOJOPollInputOperatorApplication());
+ }
+
+ @Test
+ public void testJdbcPOJOInputOperatorApplication() throws Exception
+ {
+ testApplication(new JdbcPOJOInputOperatorApplication());
+ }
+
+
+ public static class JdbcPOJOInputOperatorApplication implements StreamingApplication
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration configuration)
+ {
+ cleanTable();
+ insertEvents(10, true, 0);
+ JdbcPOJOInputOperator inputOperator = dag.addOperator("JdbcPOJOInput", new JdbcPOJOInputOperator());
+ JdbcStore store = new JdbcStore();
+ store.setDatabaseDriver(DB_DRIVER);
+ store.setDatabaseUrl(URL);
+ inputOperator.setStore(store);
+ inputOperator.setTableName(TABLE_POJO_NAME);
+ inputOperator.setFetchSize(100);
+ dag.getMeta(inputOperator).getMeta(inputOperator.outputPort).getAttributes().put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+ ResultCollector result = dag.addOperator("result", new ResultCollector());
+ dag.addStream("pojo", inputOperator.outputPort, result.input);
+ }
+ }
+
+ public static class JdbcPOJOPollInputOperatorApplication implements StreamingApplication
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration configuration)
+ {
+ cleanTable();
+ insertEvents(10, true, 0);
+ JdbcPOJOPollInputOperator inputOperator = dag.addOperator("JdbcPOJOPollInput", new JdbcPOJOPollInputOperator());
+ JdbcStore store = new JdbcStore();
+ store.setDatabaseDriver(DB_DRIVER);
+ store.setDatabaseUrl(URL);
+ inputOperator.setStore(store);
+ inputOperator.setTableName(TABLE_POJO_NAME);
+ inputOperator.setKey("id");
+ inputOperator.setFetchSize(100);
+ inputOperator.setBatchSize(100);
+ inputOperator.setPartitionCount(2);
+ dag.getMeta(inputOperator).getMeta(inputOperator.outputPort).getAttributes().put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+ ResultCollector result = dag.addOperator("result", new ResultCollector());
+ dag.addStream("pojo", inputOperator.outputPort, result.input);
+ }
+ }
+
+ public static class ResultCollector extends BaseOperator
+ {
+ public final transient DefaultInputPort<java.lang.Object> input = new DefaultInputPort<Object>()
+ {
+ @Override
+ public void process(java.lang.Object in)
+ {
+ TestPOJOEvent obj = (TestPOJOEvent)in;
+ TupleCount++;
+ }
+ };
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ TupleCount = 0;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ee77dc65/library/src/test/resources/JdbcProperties.xml
----------------------------------------------------------------------
diff --git a/library/src/test/resources/JdbcProperties.xml b/library/src/test/resources/JdbcProperties.xml
index 3e76cf7..431c113 100644
--- a/library/src/test/resources/JdbcProperties.xml
+++ b/library/src/test/resources/JdbcProperties.xml
@@ -48,6 +48,137 @@
</value>
</property>
+ <property>
+ <name>dt.operator.JdbcPOJOPollInput.fieldInfosItem[0]</name>
+ <value>
+ {
+ "columnName":"ID",
+ "pojoFieldExpression": "id",
+ "type":"INTEGER"
+ }
+
+ </value>
+ </property>
+
+
+ <property>
+ <name>dt.operator.JdbcPOJOPollInput.fieldInfosItem[1]</name>
+ <value>
+ {
+ "columnName":"STARTDATE",
+ "pojoFieldExpression": "startDate",
+ "type":"STRING"
+ }
+
+ </value>
+ </property>
+ <property>
+ <name>dt.operator.JdbcPOJOPollInput.fieldInfosItem[2]</name>
+ <value>
+ {
+ "columnName":"STARTTIME",
+ "pojoFieldExpression": "startTime",
+ "type":"STRING"
+ }
+
+ </value>
+ </property>
+
+ <property>
+ <name>dt.operator.JdbcPOJOPollInput.fieldInfosItem[3]</name>
+ <value>
+ {
+ "columnName":"STARTTIMESTAMP",
+ "pojoFieldExpression": "startTimestamp",
+ "type":"STRING"
+ }
+
+ </value>
+ </property>
+
+ <property>
+ <name>dt.operator.JdbcPOJOPollInput.fieldInfosItem[4]</name>
+ <value>
+ {
+ "columnName":"SCORE",
+ "pojoFieldExpression": "score",
+ "type":"DOUBLE"
+ }
+
+ </value>
+ </property>
+
+ <property>
+ <name>dt.operator.JdbcPOJOPollInput.columnsExpression</name>
+ <value>
+ id,startDate,startTime,startTimestamp,score
+ </value>
+ </property>
+
+
+
+ <property>
+ <name>dt.operator.JdbcPOJOInput.fieldInfosItem[0]</name>
+ <value>
+ {
+ "columnName":"ID",
+ "pojoFieldExpression": "id",
+ "type":"INTEGER"
+ }
+
+ </value>
+ </property>
+
+
+ <property>
+ <name>dt.operator.JdbcPOJOInput.fieldInfosItem[1]</name>
+ <value>
+ {
+ "columnName":"STARTDATE",
+ "pojoFieldExpression": "startDate",
+ "type":"STRING"
+ }
+
+ </value>
+ </property>
+ <property>
+ <name>dt.operator.JdbcPOJOInput.fieldInfosItem[2]</name>
+ <value>
+ {
+ "columnName":"STARTTIME",
+ "pojoFieldExpression": "startTime",
+ "type":"STRING"
+ }
+
+ </value>
+ </property>
+
+ <property>
+ <name>dt.operator.JdbcPOJOInput.fieldInfosItem[3]</name>
+ <value>
+ {
+ "columnName":"STARTTIMESTAMP",
+ "pojoFieldExpression": "startTimestamp",
+ "type":"STRING"
+ }
+
+ </value>
+ </property>
+
+ <property>
+ <name>dt.operator.JdbcPOJOInput.fieldInfosItem[4]</name>
+ <value>
+ {
+ "columnName":"SCORE",
+ "pojoFieldExpression": "score",
+ "type":"DOUBLE"
+ }
+
+ </value>
+ </property>
+
+
+
</configuration>