You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by bh...@apache.org on 2016/11/24 12:08:00 UTC

apex-malhar git commit: APEXMALHAR-2340 code changes to initialize the list of JdbcFieldInfo in JdbcPOJOInsertOutput from properties.xml

Repository: apex-malhar
Updated Branches:
  refs/heads/master f617d5e35 -> e01cf9c44


APEXMALHAR-2340 code changes to initialize the list of JdbcFieldInfo in JdbcPOJOInsertOutput from properties.xml


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/e01cf9c4
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/e01cf9c4
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/e01cf9c4

Branch: refs/heads/master
Commit: e01cf9c447a34791e6de5efb160f91ac20a87df9
Parents: f617d5e
Author: Hitesh-Scorpio <fo...@gmail.com>
Authored: Thu Nov 17 10:49:20 2016 +0530
Committer: Hitesh-Scorpio <fo...@gmail.com>
Committed: Thu Nov 24 16:24:53 2016 +0530

----------------------------------------------------------------------
 .../db/jdbc/AbstractJdbcPOJOOutputOperator.java |  27 +++-
 .../db/jdbc/JdbcPOJOInsertOutputOperator.java   |   4 +-
 .../jdbc/JdbcPojoOperatorApplicationTest.java   | 129 +++++++++++++++++++
 library/src/test/resources/JdbcProperties.xml   |  53 ++++++++
 4 files changed, 209 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e01cf9c4/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java
index 38d44a0..99b14da 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java
@@ -25,10 +25,12 @@ import java.sql.SQLException;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.sql.Types;
+import java.util.ArrayList;
 import java.util.List;
 
 import javax.validation.constraints.NotNull;
 
+import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,8 +63,7 @@ import com.datatorrent.lib.util.PojoUtils.GetterShort;
 @org.apache.hadoop.classification.InterfaceStability.Evolving
 public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object>
 {
-  private List<JdbcFieldInfo> fieldInfos;
-
+  private List<JdbcFieldInfo> fieldInfos = new ArrayList<>();
   protected List<Integer> columnDataTypes;
 
   @NotNull
@@ -295,4 +296,26 @@ public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransac
     }
   }
 
+  /**
+   * Function to initialize the list of {@link JdbcFieldInfo} from properties.xml
+   * @param index
+   * @param value
+   */
+  public void setFieldInfosItem(int index, String value)
+  {
+    try {
+      JSONObject jo = new JSONObject(value);
+      JdbcFieldInfo jdbcFieldInfo = new JdbcFieldInfo(jo.getString("columnName"), jo.getString("pojoFieldExpression"),
+          FieldInfo.SupportType.valueOf(jo.getString("type")), jo.getInt("sqlType"));
+      final int need = index - fieldInfos.size() + 1;
+      for (int i = 0; i < need; i++) {
+        fieldInfos.add(null);
+      }
+      fieldInfos.set(index,jdbcFieldInfo);
+    } catch (Exception e) {
+      throw new RuntimeException("Exception in setting JdbcFieldInfo");
+    }
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e01cf9c4/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java
index 706757a..8fe20fe 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java
@@ -67,7 +67,7 @@ public class JdbcPOJOInsertOutputOperator extends AbstractJdbcPOJOOutputOperator
        * columnNamesSet is the set having column names given by the user
        */
       HashSet<String> columnNamesSet = new HashSet<>();
-      if (getFieldInfos() == null) { // then assume direct mapping
+      if (getFieldInfos() == null || getFieldInfos().size() == 0) { // then assume direct mapping
         LOG.info("FieldInfo missing. Assuming direct mapping between POJO fields and DB columns");
       } else {
         // FieldInfo supplied by user
@@ -93,7 +93,7 @@ public class JdbcPOJOInsertOutputOperator extends AbstractJdbcPOJOOutputOperator
   @Override
   public void activate(OperatorContext context)
   {
-    if (getFieldInfos() == null) {
+    if (getFieldInfos() == null || getFieldInfos().size() == 0) {
       Field[] fields = pojoClass.getDeclaredFields();
       // Create fieldInfos in case of direct mapping
       List<JdbcFieldInfo> fieldInfos = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e01cf9c4/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorApplicationTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorApplicationTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorApplicationTest.java
new file mode 100644
index 0000000..fe43327
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorApplicationTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.Callable;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.stram.StramLocalCluster;
+
+public class JdbcPojoOperatorApplicationTest extends JdbcOperatorTest
+{
+  public static int TupleCount;
+  public static com.datatorrent.lib.parser.XmlParserTest.EmployeeBean obj;
+
+  public int getNumOfRowsinTable(String tableName)
+  {
+    Connection con;
+    try {
+      con = DriverManager.getConnection(URL);
+      Statement stmt = con.createStatement();
+      String countQuery = "SELECT count(*) from " + tableName;
+      ResultSet resultSet = stmt.executeQuery(countQuery);
+      resultSet.next();
+      return resultSet.getInt(1);
+    } catch (SQLException e) {
+      throw new RuntimeException("fetching count", e);
+    }
+  }
+
+  @Test
+  public void testApplication() throws Exception
+  {
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      conf.addResource(this.getClass().getResourceAsStream("/JdbcProperties.xml"));
+      lma.prepareDAG(new JdbcPojoOperatorApplication(), conf);
+      LocalMode.Controller lc = lma.getController();
+      lc.setHeartbeatMonitoringEnabled(false);
+      ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+      {
+        @Override
+        public Boolean call() throws Exception
+        {
+          return TupleCount == 10;
+        }
+      });
+      lc.run(10000);// runs for 10 seconds and quits
+      Assert.assertEquals("rows in db", 10, getNumOfRowsinTable(TABLE_POJO_NAME));
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
+    }
+  }
+
+  public static class JdbcPojoOperatorApplication implements StreamingApplication
+  {
+    @Override
+    public void populateDAG(DAG dag, Configuration configuration)
+    {
+      JdbcPOJOInsertOutputOperator jdbc = dag.addOperator("JdbcOutput", new JdbcPOJOInsertOutputOperator());
+      JdbcTransactionalStore outputStore = new JdbcTransactionalStore();
+      outputStore.setDatabaseDriver(DB_DRIVER);
+      outputStore.setDatabaseUrl(URL);
+      jdbc.setStore(outputStore);
+      jdbc.setBatchSize(3);
+      jdbc.setTablename(TABLE_POJO_NAME);
+      dag.getMeta(jdbc).getMeta(jdbc.input).getAttributes().put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+      cleanTable();
+      JdbcPojoEmitterOperator input = dag.addOperator("data", new JdbcPojoEmitterOperator());
+      dag.addStream("pojo", input.output, jdbc.input);
+    }
+  }
+
+  public static class JdbcPojoEmitterOperator extends BaseOperator implements InputOperator
+  {
+    public static int emitTuple = 10;
+    public final transient DefaultOutputPort<TestPOJOEvent> output = new DefaultOutputPort<TestPOJOEvent>();
+
+    @Override
+    public void emitTuples()
+    {
+      if (emitTuple > 0) {
+        output.emit(new TestPOJOEvent(emitTuple,"test" + emitTuple));
+        emitTuple--;
+        TupleCount++;
+      }
+    }
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      TupleCount = 0;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e01cf9c4/library/src/test/resources/JdbcProperties.xml
----------------------------------------------------------------------
diff --git a/library/src/test/resources/JdbcProperties.xml b/library/src/test/resources/JdbcProperties.xml
new file mode 100644
index 0000000..3e76cf7
--- /dev/null
+++ b/library/src/test/resources/JdbcProperties.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+
+  <property>
+    <name>dt.operator.JdbcOutput.fieldInfosItem[0]</name>
+    <value>
+      {
+      "sqlType": 0,
+      "columnName":"ID",
+      "pojoFieldExpression": "id",
+      "type":"INTEGER"
+      }
+
+    </value>
+  </property>
+
+
+  <property>
+    <name>dt.operator.JdbcOutput.fieldInfosItem[1]</name>
+    <value>
+      {
+      "sqlType": 0,
+      "columnName":"NAME",
+      "pojoFieldExpression": "name",
+      "type":"STRING"
+      }
+
+    </value>
+  </property>
+
+
+
+</configuration>