You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by bh...@apache.org on 2016/12/01 12:44:06 UTC

apex-malhar git commit: APEXMALHAR-2344 code changes to initialize the list of FieldInfo from properties.xml

Repository: apex-malhar
Updated Branches:
  refs/heads/master a5e8fa3fa -> ee77dc654


APEXMALHAR-2344 code changes to initialize the list of FieldInfo from properties.xml


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/ee77dc65
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/ee77dc65
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/ee77dc65

Branch: refs/heads/master
Commit: ee77dc654692c6a780c9f54a98f838c5f14e6527
Parents: a5e8fa3
Author: Hitesh-Scorpio <fo...@gmail.com>
Authored: Mon Nov 28 19:48:31 2016 +0530
Committer: Hitesh-Scorpio <fo...@gmail.com>
Committed: Thu Dec 1 12:21:58 2016 +0530

----------------------------------------------------------------------
 .../lib/db/jdbc/JdbcPOJOInputOperator.java      |  36 +++-
 .../lib/db/jdbc/JdbcPOJOPollInputOperator.java  |  37 +++-
 .../jdbc/JdbcInputOperatorApplicationTest.java  | 167 +++++++++++++++++++
 library/src/test/resources/JdbcProperties.xml   | 131 +++++++++++++++
 4 files changed, 369 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ee77dc65/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
index 59c2807..e587d85 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
@@ -27,12 +27,14 @@ import java.sql.SQLException;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.sql.Types;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 import javax.validation.constraints.Min;
 import javax.validation.constraints.NotNull;
 
+import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,7 +82,7 @@ public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object>
   private boolean mysqlSyntax;
 
   @NotNull
-  private List<FieldInfo> fieldInfos;
+  private List<FieldInfo> fieldInfos = new ArrayList<>();;
 
   @Min(1)
   private int fetchSize;
@@ -632,5 +634,37 @@ public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object>
     this.mysqlSyntax = mysqlSyntax;
   }
 
+  /**
+   * Function to initialize the list of {@link FieldInfo} externally from configuration/properties file.
+   * Example entry in the properties/configuration file:
+   <property>
+   <name>dt.operator.JdbcPOJOInput.fieldInfosItem[0]</name>
+   <value>
+   {
+   "columnName":"ID",
+   "pojoFieldExpression": "id",
+   "type":"INTEGER"
+   }
+   </value>
+   </property>
+   * @param index is the index in the list which is to be initialized.
+   * @param value is the JSON String with appropriate mappings for {@link FieldInfo}.
+   */
+  public void setFieldInfosItem(int index, String value)
+  {
+    try {
+      JSONObject jo = new JSONObject(value);
+      FieldInfo fieldInfo = new FieldInfo(jo.getString("columnName"), jo.getString("pojoFieldExpression"),
+          FieldInfo.SupportType.valueOf(jo.getString("type")));
+      final int need = index - fieldInfos.size() + 1;
+      for (int i = 0; i < need; i++) {
+        fieldInfos.add(null);
+      }
+      fieldInfos.set(index,fieldInfo);
+    } catch (Exception e) {
+      throw new RuntimeException("Exception in setting FieldInfo " + value + " " + e.getMessage());
+    }
+  }
+
   public static final Logger LOG = LoggerFactory.getLogger(JdbcPOJOInputOperator.class);
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ee77dc65/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
index 62618de..129981a 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
@@ -27,11 +27,13 @@ import java.sql.SQLException;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.sql.Types;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 import javax.validation.constraints.NotNull;
 
+import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,7 +67,7 @@ public class JdbcPOJOPollInputOperator extends AbstractJdbcPollInputOperator<Obj
   protected List<Integer> columnDataTypes;
   protected transient Class<?> pojoClass;
   @NotNull
-  private List<FieldInfo> fieldInfos;
+  private List<FieldInfo> fieldInfos = new ArrayList<>();
 
   @OutputPortFieldAnnotation(schemaRequired = true)
   public final transient DefaultOutputPort<Object> outputPort = new DefaultOutputPort<Object>()
@@ -323,5 +325,38 @@ public class JdbcPOJOPollInputOperator extends AbstractJdbcPollInputOperator<Obj
     this.fieldInfos = fieldInfos;
   }
 
+  /**
+   * Function to initialize the list of {@link FieldInfo} externally from configuration/properties file.
+   * Example entry in the properties/configuration file:
+   <property>
+     <name>dt.operator.JdbcPOJOInput.fieldInfosItem[0]</name>
+     <value>
+      {
+       "columnName":"ID",
+       "pojoFieldExpression": "id",
+       "type":"INTEGER"
+      }
+     </value>
+   </property>
+   * @param index is the index in the list which is to be initialized.
+   * @param value is the JSON String with appropriate mappings for {@link FieldInfo}.
+   */
+  public void setFieldInfosItem(int index, String value)
+  {
+
+    try {
+      JSONObject jo = new JSONObject(value);
+      FieldInfo fieldInfo = new FieldInfo(jo.getString("columnName"), jo.getString("pojoFieldExpression"),
+          FieldInfo.SupportType.valueOf(jo.getString("type")));
+      final int need = index - fieldInfos.size() + 1;
+      for (int i = 0; i < need; i++) {
+        fieldInfos.add(null);
+      }
+      fieldInfos.set(index,fieldInfo);
+    } catch (Exception e) {
+      throw new RuntimeException("Exception in setting FieldInfo " + value + " " + e.getMessage());
+    }
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(JdbcPOJOPollInputOperator.class);
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ee77dc65/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcInputOperatorApplicationTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcInputOperatorApplicationTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcInputOperatorApplicationTest.java
new file mode 100644
index 0000000..c59978a
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcInputOperatorApplicationTest.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.Callable;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Tests to check if setting List of {@link FieldInfo} externally from
+ * configuration file works fine for Jdbc input operators.
+ */
+public class JdbcInputOperatorApplicationTest extends JdbcOperatorTest
+{
+  public static int TupleCount;
+
+  public int getNumOfRowsinTable(String tableName)
+  {
+    Connection con;
+    try {
+      con = DriverManager.getConnection(URL);
+      Statement stmt = con.createStatement();
+      String countQuery = "SELECT count(*) from " + tableName;
+      ResultSet resultSet = stmt.executeQuery(countQuery);
+      resultSet.next();
+      return resultSet.getInt(1);
+    } catch (SQLException e) {
+      throw new RuntimeException("fetching count", e);
+    }
+  }
+
+  public void testApplication(StreamingApplication streamingApplication) throws Exception
+  {
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      conf.addResource(this.getClass().getResourceAsStream("/JdbcProperties.xml"));
+      lma.prepareDAG(streamingApplication, conf);
+      LocalMode.Controller lc = lma.getController();
+      lc.setHeartbeatMonitoringEnabled(false);
+      ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+      {
+        @Override
+        public Boolean call() throws Exception
+        {
+          return TupleCount == 10;
+        }
+      });
+      lc.run(10000);// runs for 10 seconds and quits
+      Assert.assertEquals("rows in db", TupleCount, getNumOfRowsinTable(TABLE_POJO_NAME));
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
+    }
+
+  }
+
+  @Test
+  public void testJdbcPOJOPollInputOperatorApplication() throws Exception
+  {
+    testApplication(new JdbcPOJOPollInputOperatorApplication());
+  }
+
+  @Test
+  public void testJdbcPOJOInputOperatorApplication() throws Exception
+  {
+    testApplication(new JdbcPOJOInputOperatorApplication());
+  }
+
+
+  public static class JdbcPOJOInputOperatorApplication implements StreamingApplication
+  {
+    @Override
+    public void populateDAG(DAG dag, Configuration configuration)
+    {
+      cleanTable();
+      insertEvents(10, true, 0);
+      JdbcPOJOInputOperator inputOperator = dag.addOperator("JdbcPOJOInput", new JdbcPOJOInputOperator());
+      JdbcStore store = new JdbcStore();
+      store.setDatabaseDriver(DB_DRIVER);
+      store.setDatabaseUrl(URL);
+      inputOperator.setStore(store);
+      inputOperator.setTableName(TABLE_POJO_NAME);
+      inputOperator.setFetchSize(100);
+      dag.getMeta(inputOperator).getMeta(inputOperator.outputPort).getAttributes().put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+      ResultCollector result = dag.addOperator("result", new ResultCollector());
+      dag.addStream("pojo", inputOperator.outputPort, result.input);
+    }
+  }
+
+  public static class JdbcPOJOPollInputOperatorApplication implements StreamingApplication
+  {
+    @Override
+    public void populateDAG(DAG dag, Configuration configuration)
+    {
+      cleanTable();
+      insertEvents(10, true, 0);
+      JdbcPOJOPollInputOperator inputOperator = dag.addOperator("JdbcPOJOPollInput", new JdbcPOJOPollInputOperator());
+      JdbcStore store = new JdbcStore();
+      store.setDatabaseDriver(DB_DRIVER);
+      store.setDatabaseUrl(URL);
+      inputOperator.setStore(store);
+      inputOperator.setTableName(TABLE_POJO_NAME);
+      inputOperator.setKey("id");
+      inputOperator.setFetchSize(100);
+      inputOperator.setBatchSize(100);
+      inputOperator.setPartitionCount(2);
+      dag.getMeta(inputOperator).getMeta(inputOperator.outputPort).getAttributes().put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+      ResultCollector result = dag.addOperator("result", new ResultCollector());
+      dag.addStream("pojo", inputOperator.outputPort, result.input);
+    }
+  }
+
+  public static class ResultCollector extends BaseOperator
+  {
+    public final transient DefaultInputPort<java.lang.Object> input = new DefaultInputPort<Object>()
+    {
+      @Override
+      public void process(java.lang.Object in)
+      {
+        TestPOJOEvent obj = (TestPOJOEvent)in;
+        TupleCount++;
+      }
+    };
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      TupleCount = 0;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ee77dc65/library/src/test/resources/JdbcProperties.xml
----------------------------------------------------------------------
diff --git a/library/src/test/resources/JdbcProperties.xml b/library/src/test/resources/JdbcProperties.xml
index 3e76cf7..431c113 100644
--- a/library/src/test/resources/JdbcProperties.xml
+++ b/library/src/test/resources/JdbcProperties.xml
@@ -48,6 +48,137 @@
     </value>
   </property>
 
+  <property>
+    <name>dt.operator.JdbcPOJOPollInput.fieldInfosItem[0]</name>
+    <value>
+      {
+      "columnName":"ID",
+      "pojoFieldExpression": "id",
+      "type":"INTEGER"
+      }
+
+    </value>
+  </property>
+
+
+  <property>
+    <name>dt.operator.JdbcPOJOPollInput.fieldInfosItem[1]</name>
+    <value>
+      {
+      "columnName":"STARTDATE",
+      "pojoFieldExpression": "startDate",
+      "type":"STRING"
+      }
+
+    </value>
+  </property>
+  <property>
+    <name>dt.operator.JdbcPOJOPollInput.fieldInfosItem[2]</name>
+    <value>
+      {
+      "columnName":"STARTTIME",
+      "pojoFieldExpression": "startTime",
+      "type":"STRING"
+      }
+
+    </value>
+  </property>
+
+  <property>
+    <name>dt.operator.JdbcPOJOPollInput.fieldInfosItem[3]</name>
+    <value>
+      {
+      "columnName":"STARTTIMESTAMP",
+      "pojoFieldExpression": "startTimestamp",
+      "type":"STRING"
+      }
+
+    </value>
+  </property>
+
+  <property>
+    <name>dt.operator.JdbcPOJOPollInput.fieldInfosItem[4]</name>
+    <value>
+      {
+      "columnName":"SCORE",
+      "pojoFieldExpression": "score",
+      "type":"DOUBLE"
+      }
+
+    </value>
+  </property>
+
+  <property>
+    <name>dt.operator.JdbcPOJOPollInput.columnsExpression</name>
+    <value>
+      id,startDate,startTime,startTimestamp,score
+    </value>
+  </property>
+
+
+
+  <property>
+    <name>dt.operator.JdbcPOJOInput.fieldInfosItem[0]</name>
+    <value>
+      {
+      "columnName":"ID",
+      "pojoFieldExpression": "id",
+      "type":"INTEGER"
+      }
+
+    </value>
+  </property>
+
+
+  <property>
+    <name>dt.operator.JdbcPOJOInput.fieldInfosItem[1]</name>
+    <value>
+      {
+      "columnName":"STARTDATE",
+      "pojoFieldExpression": "startDate",
+      "type":"STRING"
+      }
+
+    </value>
+  </property>
+  <property>
+    <name>dt.operator.JdbcPOJOInput.fieldInfosItem[2]</name>
+    <value>
+      {
+      "columnName":"STARTTIME",
+      "pojoFieldExpression": "startTime",
+      "type":"STRING"
+      }
+
+    </value>
+  </property>
+
+  <property>
+    <name>dt.operator.JdbcPOJOInput.fieldInfosItem[3]</name>
+    <value>
+      {
+      "columnName":"STARTTIMESTAMP",
+      "pojoFieldExpression": "startTimestamp",
+      "type":"STRING"
+      }
+
+    </value>
+  </property>
+
+  <property>
+    <name>dt.operator.JdbcPOJOInput.fieldInfosItem[4]</name>
+    <value>
+      {
+      "columnName":"SCORE",
+      "pojoFieldExpression": "score",
+      "type":"DOUBLE"
+      }
+
+    </value>
+  </property>
+
+
+
 
 
 </configuration>