You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:26 UTC
[25/54] [abbrv] [partial] incubator-quarks git commit: add
"org.apache." prefix to edgent package names
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/main/java/edgent/connectors/jdbc/ResultsHandler.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/main/java/edgent/connectors/jdbc/ResultsHandler.java b/connectors/jdbc/src/main/java/edgent/connectors/jdbc/ResultsHandler.java
deleted file mode 100644
index 1b5f74c..0000000
--- a/connectors/jdbc/src/main/java/edgent/connectors/jdbc/ResultsHandler.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.jdbc;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-import edgent.function.Consumer;
-
-/**
- * Handle the results of executing an SQL statement.
- * <p>
- * Sample use:
- * <br>
- * For a ResultSet created by executing the SQL statement:
- * <br>
- * {@code "SELECT id, firstname, lastname FROM persons WHERE id = ?"}
- * <pre>{@code
- * // create a Person tuple from db person info and add it to a stream
- * ResultsHandler<PersonId,Person> rh =
- * (tuple,rs,exc,consumer) -> {
- * if (exc != null)
- * return;
- * rs.next();
- * int id = rs.getInt("id");
- * String firstName = rs.getString("firstname");
- * String lastName = rs.getString("lastname");
- * consumer.accept(new Person(id, firstName, lastName));
- * }
- * }
- * };
- * }</pre>
- *
- * @param <T> type of the tuple inducing the SQL statement execution / results
- * @param <R> type of tuple of a result stream consumer
- */
-@FunctionalInterface
-public interface ResultsHandler<T,R> {
- /**
- * Process the {@code ResultSet} and add 0 or more tuples to {@code consumer}.
- * @param tuple the tuple that induced the resultSet
- * @param resultSet the SQL statement's result set. null if {@code exc}
- * is non-null or if the statement doesn't generate a {@code ResultSet}.
- * @param exc non-null if there was an exception executing the statement.
- * Typically a SQLException.
- * @param consumer a Consumer to a result stream.
- * @throws SQLException if there are problems handling the result
- */
- public void handleResults(T tuple, ResultSet resultSet, Exception exc, Consumer<R> consumer) throws SQLException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/main/java/edgent/connectors/jdbc/StatementSupplier.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/main/java/edgent/connectors/jdbc/StatementSupplier.java b/connectors/jdbc/src/main/java/edgent/connectors/jdbc/StatementSupplier.java
deleted file mode 100644
index 2c00791..0000000
--- a/connectors/jdbc/src/main/java/edgent/connectors/jdbc/StatementSupplier.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.jdbc;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
-/**
- * Function that supplies a JDBC SQL {@link java.sql.PreparedStatement}.
- */
-@FunctionalInterface
-public interface StatementSupplier {
- /**
- * Create a JDBC SQL PreparedStatement containing 0 or more parameters.
- * <p>
- * Sample use:
- * <pre>{@code
- * StatementSupplier ss =
- * (cn) -> cn.prepareStatement("SELECT id, firstname, lastname"
- * + " FROM persons WHERE id = ?");
- * }</pre>
- * @param cn JDBC connection
- * @return the PreparedStatement
- * @throws SQLException on failure
- */
- PreparedStatement get(Connection cn) throws SQLException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/main/java/edgent/connectors/jdbc/package-info.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/main/java/edgent/connectors/jdbc/package-info.java b/connectors/jdbc/src/main/java/edgent/connectors/jdbc/package-info.java
deleted file mode 100644
index 6320fb2..0000000
--- a/connectors/jdbc/src/main/java/edgent/connectors/jdbc/package-info.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-/**
- * JDBC based database stream connector.
- * <p>
- * Stream tuples may be written to databases
- * and created or enriched by reading from databases.
- */
-package edgent.connectors.jdbc;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/main/java/edgent/connectors/jdbc/runtime/JdbcConnector.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/main/java/edgent/connectors/jdbc/runtime/JdbcConnector.java b/connectors/jdbc/src/main/java/edgent/connectors/jdbc/runtime/JdbcConnector.java
deleted file mode 100644
index 946bc8a..0000000
--- a/connectors/jdbc/src/main/java/edgent/connectors/jdbc/runtime/JdbcConnector.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.jdbc.runtime;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.SQLTransientException;
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.sql.DataSource;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import edgent.connectors.jdbc.CheckedFunction;
-import edgent.connectors.jdbc.CheckedSupplier;
-
-public class JdbcConnector {
-
- private static final Logger logger = LoggerFactory.getLogger(JdbcConnector.class);
- private final CheckedSupplier<DataSource> dataSourceFn;
- private final CheckedFunction<DataSource,Connection> connFn;
- private DataSource ds;
- private final Map<JdbcStatement<?,?>,Connection> cnMap = new HashMap<>();
-
- public JdbcConnector(CheckedSupplier<DataSource> dataSourceFn, CheckedFunction<DataSource,Connection> connFn) {
- this.dataSourceFn = dataSourceFn;
- this.connFn = connFn;
- }
-
- Logger getLogger() {
- return logger;
- }
-
- void unregister(JdbcStatement<?,?> oplet) {
- logger.trace("unregistering statement");
- closeCn(oplet);
- }
-
- private DataSource getDataSource() throws Exception {
- if (ds == null) {
- logger.trace("getting DataSource");
- ds = dataSourceFn.get();
- }
- return ds;
- }
-
- synchronized Connection getConnection(JdbcStatement<?,?> oplet) throws Exception {
- // Apparently a bad idea for multiple threads (operators
- // in our case) to use a single Connection instance.
- Connection cn = cnMap.get(oplet);
- if (cn == null) {
- try {
- logger.trace("getting jdbc connection");
- cn = connFn.apply(getDataSource());
- cnMap.put(oplet, cn);
- }
- catch (Exception e) {
- logger.error("unable to connect", e);
- throw e;
- }
- }
- return cn;
- }
-
- void statementFailed(JdbcStatement<?,?> oplet, Exception e) {
- logger.error("statement failed", e);
- if (!(e instanceof SQLTransientException)) {
- closeCn(oplet);
- }
- }
-
- private synchronized void closeCn(JdbcStatement<?,?> oplet) {
- try {
- Connection cn = cnMap.remove(oplet);
- if (cn != null) {
- logger.trace("closing jdbc connection");
- cn.close();
- }
- }
- catch (SQLException e) {
- logger.error("jdbc close cn failed", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/main/java/edgent/connectors/jdbc/runtime/JdbcStatement.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/main/java/edgent/connectors/jdbc/runtime/JdbcStatement.java b/connectors/jdbc/src/main/java/edgent/connectors/jdbc/runtime/JdbcStatement.java
deleted file mode 100644
index 7d7ec9e..0000000
--- a/connectors/jdbc/src/main/java/edgent/connectors/jdbc/runtime/JdbcStatement.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.jdbc.runtime;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.slf4j.Logger;
-
-import edgent.connectors.jdbc.ParameterSetter;
-import edgent.connectors.jdbc.ResultsHandler;
-import edgent.connectors.jdbc.StatementSupplier;
-import edgent.function.Consumer;
-import edgent.function.Function;
-
-public class JdbcStatement<T,R> implements Function<T,Iterable<R>>,Consumer<T>,AutoCloseable {
- private static final long serialVersionUID = 1L;
- private final Logger logger;
- private final JdbcConnector connector;
- private final StatementSupplier stmtSupplier;
- private final ParameterSetter<T> paramSetter;
- private final ResultsHandler<T,R> resultsHandler;
- private PreparedStatement stmt;
- private long nTuples;
- private long nTuplesFailed;
-
- private void closeStmt() {
- if (stmt != null) {
- logger.trace("closing statement");
- PreparedStatement tmp = stmt;
- stmt = null;
- try {
- tmp.close();
- }
- catch (SQLException e) {
- logger.error("close stmt failed", e);
- }
- }
- }
-
- public JdbcStatement(JdbcConnector connector,
- StatementSupplier stmtSupplier, ParameterSetter<T> paramSetter,
- ResultsHandler<T,R> resultsHandler) {
- this.logger = connector.getLogger();
- this.connector = connector;
- this.stmtSupplier = stmtSupplier;
- this.paramSetter = paramSetter;
- this.resultsHandler = resultsHandler;
- }
-
- public JdbcStatement(JdbcConnector connector,
- StatementSupplier stmtSupplier, ParameterSetter<T> paramSetter) {
- this(connector, stmtSupplier, paramSetter, null);
- }
-
- @Override
- public void accept(T tuple) {
- executeStatement(tuple, null);
- }
-
- @Override
- public Iterable<R> apply(T tuple) {
- // lame impl for large result sets but will do for now.
- List<R> results = new ArrayList<>();
- executeStatement(tuple, results);
- return results;
- }
-
- private void executeStatement(T tuple, List<R> results) {
- nTuples++;
- try {
- logger.debug("executing statement nTuples={} nTuplesFailed={}", nTuples, nTuplesFailed);
- Connection cn = connector.getConnection(this);
- PreparedStatement stmt = getPreparedStatement(cn);
- paramSetter.setParameters(tuple, stmt);
- boolean hasResult = stmt.execute();
- if (resultsHandler != null) {
- if (!hasResult) {
- resultsHandler.handleResults(tuple, null/*rs*/, null/*exc*/,
- (result) -> results.add(result));
- }
- else {
- do {
- try (ResultSet rs = stmt.getResultSet()) {
- resultsHandler.handleResults(tuple, rs, null/*exc*/,
- (result) -> results.add(result));
- }
- } while (stmt.getMoreResults());
- }
- }
- }
- catch (Exception e) {
- nTuplesFailed++;
- logger.trace("executing statement failed nTuples={} nTuplesFailed={}", nTuples, nTuplesFailed);
- if (resultsHandler != null) {
- try {
- resultsHandler.handleResults(tuple, null/*rs*/, e,
- (result) -> results.add(result));
- }
- catch (Exception e2) {
- logger.error("failure result handler failed", e2);
- }
- }
- closeStmt();
- connector.statementFailed(this, e);
- }
- }
-
- private PreparedStatement getPreparedStatement(Connection cn) throws SQLException {
- if (stmt == null) {
- stmt = stmtSupplier.get(cn);
- }
- return stmt;
- }
-
- @Override
- public void close() throws Exception {
- closeStmt();
- connector.unregister(this);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/CheckedFunction.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/CheckedFunction.java b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/CheckedFunction.java
new file mode 100644
index 0000000..93ce314
--- /dev/null
+++ b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/CheckedFunction.java
@@ -0,0 +1,36 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.jdbc;
+
+/**
+ * Function to apply a funtion to an input value and return a result.
+ *
+ * @param <T> input stream tuple type
+ * @param <R> result stream tuple type
+ */
+@FunctionalInterface
+public interface CheckedFunction<T,R> {
+ /**
+ * Apply a function to {@code t} and return the result.
+ * @param t input value
+ * @return the function result.
+ * @throws Exception if there are processing errors.
+ */
+ R apply(T t) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/CheckedSupplier.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/CheckedSupplier.java b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/CheckedSupplier.java
new file mode 100644
index 0000000..7924c9e
--- /dev/null
+++ b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/CheckedSupplier.java
@@ -0,0 +1,34 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.jdbc;
+
+/**
+ * Function that supplies a result and may throw an Exception.
+ *
+ * @param <T> stream tuple type
+ */
+@FunctionalInterface
+public interface CheckedSupplier<T> {
+ /**
+ * Get a result.
+ * @return the result
+ * @throws Exception if there are errors
+ */
+ T get() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/JdbcStreams.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/JdbcStreams.java b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/JdbcStreams.java
new file mode 100644
index 0000000..aad6316
--- /dev/null
+++ b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/JdbcStreams.java
@@ -0,0 +1,294 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.jdbc;
+
+import java.sql.Connection;
+
+import javax.sql.DataSource;
+
+import org.apache.edgent.connectors.jdbc.runtime.JdbcConnector;
+import org.apache.edgent.connectors.jdbc.runtime.JdbcStatement;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.topology.TSink;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+/**
+ * {@code JdbcStreams} is a streams connector to a database via the
+ * JDBC API {@code java.sql} package.
+ * <p>
+ * The connector provides general SQL access to a database, enabling
+ * writing of a stream's tuples to a database, creating a stream from
+ * database query results, and other operations.
+ * Knowledge of the JDBC API is required.
+ * <p>
+ * Use of the connector involves:
+ * <ul>
+ * <li>constructing a streams connector to a database by providing it with:
+ * <ul>
+ * <li>a JDBC {@link javax.sql.DataSource}</li>
+ * <li>a function that creates a JDBC {@link java.sql.Connection}
+ * from the {@code DataSource}</li>
+ * </ul>
+ * </li>
+ * <li>defining SQL statement executions and results handling by calling one
+ * of the {@code executeStatement()} methods:
+ * <ul>
+ * <li>specify an SQL statement String or define a {@link StatementSupplier}.
+ * A {@code StatementSupplier}
+ * creates a JDBC {@link java.sql.PreparedStatement} for an SQL statement
+ * (e.g., a query, insert, update, etc operation).</li>
+ * <li>define a {@link ParameterSetter}. A {@code ParameterSetter}
+ * sets the parameter values in a generic {@code PreparedStatement}.</li>
+ * <li>define a {@link ResultsHandler} as required.
+ * A {@code ResultsHandler} processes a JDBC
+ * {@link java.sql.ResultSet} created by executing a SQL statement,
+ * optionally creating one or more tuples from the results
+ * and adding them to a stream.</li>
+ * </ul>
+ * </li>
+ * </ul>
+ * <p>
+ * Sample use:
+ * <pre>{@code
+ * // construct a connector to the database
+ * JdbcStreams mydb = new JdbcStreams(
+ * // fn to create the javax.sql.DataSource to the db
+ * () -> {
+ * Context ctx = new javax.naming.InitialContext();
+ * return (DataSource) ctx.lookup("jdbc/myDb");
+ * },
+ * // fn to connect to the db (via the DataSource)
+ * (dataSource,cn) -> dataSource.getConnection(username,pw)
+ * );
+ *
+ * // ----------------------------------------------------
+ * //
+ * // Write a Person stream to a table
+ * //
+ * TStream<Person> persons = ...
+ * TSink sink = mydb.executeStatement(persons,
+ * () -> "INSERT INTO persons VALUES(?,?,?)",
+ * (person,stmt) -> {
+ * stmt.setInt(1, person.getId());
+ * stmt.setString(2, person.getFirstName());
+ * stmt.setString(3, person.getLastName());
+ * },
+ * );
+ *
+ * // ----------------------------------------------------
+ * //
+ * // Create a stream of Person from a PersonId tuple
+ * //
+ * TStream<PersonId> personIds = ...
+ * TStream<Person> persons = mydb.executeStatement(personIds,
+ * () -> "SELECT id, firstname, lastname FROM persons WHERE id = ?",
+ * (personId,stmt) -> stmt.setInt(1,personId.getId()),
+ * (personId,rs,exc,consumer) -> {
+ * if (exc != null) {
+ * // statement failed, do something
+ * int ecode = exc.getErrorCode();
+ * String state = exc.getSQLState();
+ * ... // consumer.accept(...) if desired.
+ * }
+ * else {
+ * rs.next();
+ * int id = resultSet.getInt("id");
+ * String firstName = resultSet.getString("firstname");
+ * String lastName = resultSet.getString("lastname");
+ * consumer.accept(new Person(id, firstName, lastName));
+ * }
+ * }
+ * );
+ * persons.print();
+ *
+ * // ----------------------------------------------------
+ * //
+ * // Delete all the rows from a table
+ * //
+ * TStream<String> beacon = topology.strings("once");
+ * mydb.executeStatement(beacon,
+ * () -> "DELETE FROM persons",
+ * (tuple,stmt) -> { } // no params to set
+ * );
+ * }</pre>
+ */
+public class JdbcStreams {
+ @SuppressWarnings("unused")
+ private final Topology top;
+ private final JdbcConnector connector;
+
+ /**
+ * Create a connector that uses a JDBC {@link DataSource} object to get
+ * a database connection.
+ * <p>
+ * In some environments it's common for JDBC DataSource objects to
+ * have been registered in JNDI. In such cases the dataSourceFn can be:
+ * <pre>{@code
+ * () -> { Context ctx = new javax.naming.InitialContext();
+ * return (DataSource) ctx.lookup("jdbc/" + logicalDbName);
+ * }
+ * }</pre>
+ * <p>
+ * Alternatively, a DataSource can be created using a dbms implementation's
+ * DataSource class.
+ * For example:
+ * <pre>{@code
+ * () -> { EmbeddedDataSource ds = new org.apache.derby.jdbc.EmbeddedDataSource();
+ * ds.setDatabaseName(dbName);
+ * ds.setCreateDatabase("create");
+ * return ds;
+ * }
+ * }</pre>
+ * <p>
+ * Once {@code dataSourceFn} returns a DataSource it will not be called again.
+ * <p>
+ * {@code connFn} is called only if a new JDBC connection is needed.
+ * It is not called per-processed-tuple. JDBC failures in
+ * {@code executeStatement()} can result in a JDBC connection getting
+ * closed and {@code connFn} is subsequently called to reconnect.
+ *
+ * @param topology topology that this connector is for
+ * @param dataSourceFn function that yields the {@link DataSource}
+ * for the database.
+ * @param connFn function that yields a {@link Connection} from a {@code DataSource}.
+ */
+ public JdbcStreams(Topology topology, CheckedSupplier<DataSource> dataSourceFn, CheckedFunction<DataSource,Connection> connFn) {
+ this.top = topology;
+ this.connector = new JdbcConnector(dataSourceFn, connFn);
+ }
+
+ /**
+ * For each tuple on {@code stream} execute an SQL statement and
+ * add 0 or more resulting tuples to a result stream.
+ * <p>
+ * Same as using {@link #executeStatement(TStream, StatementSupplier, ParameterSetter, ResultsHandler)}
+ * specifying {@code dataSource -> dataSource.prepareStatement(stmtSupplier.get()}}
+ * for the {@code StatementSupplier}.
+ *
+ * @param <T> Tuple type for input stream
+ * @param <R> Tuple type of result stream
+ * @param stream tuples to execute a SQL statement on behalf of
+ * @param stmtSupplier an SQL statement
+ * @param paramSetter function to set SQL statement parameters
+ * @param resultsHandler SQL ResultSet handler
+ * @return result Stream
+ */
+ public <T,R> TStream<R> executeStatement(TStream<T> stream,
+ Supplier<String> stmtSupplier,
+ ParameterSetter<T> paramSetter,
+ ResultsHandler<T,R> resultsHandler
+ ) {
+ return stream.flatMap(new JdbcStatement<T,R>(connector,
+ cn -> cn.prepareStatement(stmtSupplier.get()),
+ paramSetter, resultsHandler));
+ }
+
+ /**
+ * For each tuple on {@code stream} execute an SQL statement and
+ * add 0 or more resulting tuples to a result stream.
+ * <p>
+ * Use to transform T tuples to R tuples, or
+ * enrich/update T tuples with additional information from a database.
+ * It can also be used to load a table into stream,
+ * using a T to trigger that.
+ * Or to execute non-ResultSet generating
+ * SQL statements and receive failure info and/or generate tuple(s)
+ * upon completion.
+ * <p>
+ * {@code stmtSupplier} is called only once per new JDBC connection/reconnect.
+ * It is not called per-tuple. Hence, with the exception of statement
+ * parameters, the returned statement is expected to be unchanging.
+ * Failures executing a statement can result in the connection getting
+ * closed and subsequently reconnected, resulting in another
+ * {@code stmtSupplier} call.
+ * <p>
+ * {@code resultsHandler} is called for every tuple.
+ * If {@code resultsHandler} throws an Exception, it is called a
+ * second time for the tuple with a non-null exception argument.
+ *
+ * @param <T> Tuple type for input stream
+ * @param <R> Tuple type of result stream
+ * @param stream tuples to execute a SQL statement on behalf of
+ * @param stmtSupplier an SQL statement
+ * @param paramSetter function to set SQL statement parameters
+ * @param resultsHandler SQL ResultSet handler
+ * @return result Stream
+ * @see #executeStatement(TStream, Supplier, ParameterSetter, ResultsHandler)
+ */
+ public <T,R> TStream<R> executeStatement(TStream<T> stream,
+ StatementSupplier stmtSupplier,
+ ParameterSetter<T> paramSetter,
+ ResultsHandler<T,R> resultsHandler
+ ) {
+ return stream.flatMap(new JdbcStatement<T,R>(connector,
+ stmtSupplier, paramSetter, resultsHandler));
+ }
+
+ /**
+ * For each tuple on {@code stream} execute an SQL statement.
+ * <p>
+ * Same as using {@link #executeStatement(TStream, StatementSupplier, ParameterSetter)}
+ * specifying {@code dataSource -> dataSource.prepareStatement(stmtSupplier.get()}}
+ * for the {@code StatementSupplier}.
+ *
+ * @param <T> Tuple type
+ * @param stream tuples to execute a SQL statement on behalf of
+ * @param stmtSupplier an SQL statement
+ * @param paramSetter function to set SQL statement parameters
+ * @return TSink sink element representing termination of this stream.
+ */
+ public <T> TSink<T> executeStatement(TStream<T> stream,
+ Supplier<String> stmtSupplier,
+ ParameterSetter<T> paramSetter
+ ) {
+ return stream.sink(new JdbcStatement<T,Object>(connector,
+ cn -> cn.prepareStatement(stmtSupplier.get()),
+ paramSetter));
+ }
+
+ /**
+ * For each tuple on {@code stream} execute an SQL statement.
+ * <p>
+ * Use to write a stream of T to a table.
+ * More generally, use a T as a trigger to execute some SQL statement
+ * that doesn't yield a ResultSet.
+ * <p>
+ * Use a non-sink form of {@code executeStatement()} (forms
+ * that take a {@code ResultsHandler}), if you want to:
+ * <ul>
+ * <li>be notified of statement execution failures</li>
+ * <li>generate tuple(s) after the statement has run.</li>
+ * </ul>
+ *
+ * @param <T> Tuple type
+ * @param stream tuples to execute a SQL statement on behalf of
+ * @param stmtSupplier an SQL statement
+ * @param paramSetter function to set SQL statement parameters
+ * @return TSink sink element representing termination of this stream.
+ * @see #executeStatement(TStream, Supplier, ParameterSetter)
+ */
+ public <T> TSink<T> executeStatement(TStream<T> stream,
+ StatementSupplier stmtSupplier,
+ ParameterSetter<T> paramSetter
+ ) {
+ return stream.sink(new JdbcStatement<T,Object>(connector,
+ stmtSupplier, paramSetter));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/ParameterSetter.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/ParameterSetter.java b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/ParameterSetter.java
new file mode 100644
index 0000000..e38ab11
--- /dev/null
+++ b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/ParameterSetter.java
@@ -0,0 +1,46 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.jdbc;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * Function that sets parameters in a JDBC SQL {@link java.sql.PreparedStatement}.
+ *
+ * @param <T> stream tuple type
+ */
+@FunctionalInterface
+public interface ParameterSetter<T> {
+ /**
+ * Set 0 or more parameters in a JDBC PreparedStatement.
+ * <p>
+ * Sample use for a PreparedStatement of:
+ * <br>
+ * {@code "SELECT id, firstname, lastname FROM persons WHERE id = ?"}
+ * <pre>{@code
+ * ParameterSetter<PersonId> ps = (personId,stmt) -> stmt.setInt(1, personId.getId());
+ * }</pre>
+ *
+ * @param t stream tuple of type T
+ * @param stmt PreparedStatement
+ * @throws SQLException on failure
+ */
+ void setParameters(T t, PreparedStatement stmt) throws SQLException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/ResultsHandler.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/ResultsHandler.java b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/ResultsHandler.java
new file mode 100644
index 0000000..668b6ca
--- /dev/null
+++ b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/ResultsHandler.java
@@ -0,0 +1,66 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.jdbc;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.edgent.function.Consumer;
+
+/**
+ * Handle the results of executing an SQL statement.
+ * <p>
+ * Sample use:
+ * <br>
+ * For a ResultSet created by executing the SQL statement:
+ * <br>
+ * {@code "SELECT id, firstname, lastname FROM persons WHERE id = ?"}
+ * <pre>{@code
+ * // create a Person tuple from db person info and add it to a stream
+ * ResultsHandler<PersonId,Person> rh =
+ * (tuple,rs,exc,consumer) -> {
+ * if (exc != null)
+ * return;
+ * rs.next();
+ * int id = rs.getInt("id");
+ * String firstName = rs.getString("firstname");
+ * String lastName = rs.getString("lastname");
+ * consumer.accept(new Person(id, firstName, lastName));
+ * }
+ * }
+ * };
+ * }</pre>
+ *
+ * @param <T> type of the tuple inducing the SQL statement execution / results
+ * @param <R> type of tuple of a result stream consumer
+ */
+@FunctionalInterface
+public interface ResultsHandler<T,R> {
+ /**
+ * Process the {@code ResultSet} and add 0 or more tuples to {@code consumer}.
+ * @param tuple the tuple that induced the resultSet
+ * @param resultSet the SQL statement's result set. null if {@code exc}
+ * is non-null or if the statement doesn't generate a {@code ResultSet}.
+ * @param exc non-null if there was an exception executing the statement.
+ * Typically a SQLException.
+ * @param consumer a Consumer to a result stream.
+ * @throws SQLException if there are problems handling the result
+ */
+ public void handleResults(T tuple, ResultSet resultSet, Exception exc, Consumer<R> consumer) throws SQLException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/StatementSupplier.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/StatementSupplier.java b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/StatementSupplier.java
new file mode 100644
index 0000000..aa2137a
--- /dev/null
+++ b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/StatementSupplier.java
@@ -0,0 +1,44 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.jdbc;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * Function that supplies a JDBC SQL {@link java.sql.PreparedStatement}.
+ */
+@FunctionalInterface
+public interface StatementSupplier {
+ /**
+ * Create a JDBC SQL PreparedStatement containing 0 or more parameters.
+ * <p>
+ * Sample use:
+ * <pre>{@code
+ * StatementSupplier ss =
+ * (cn) -> cn.prepareStatement("SELECT id, firstname, lastname"
+ * + " FROM persons WHERE id = ?");
+ * }</pre>
+ * @param cn JDBC connection
+ * @return the PreparedStatement
+ * @throws SQLException on failure
+ */
+ PreparedStatement get(Connection cn) throws SQLException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/package-info.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/package-info.java b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/package-info.java
new file mode 100644
index 0000000..16fd25d
--- /dev/null
+++ b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/package-info.java
@@ -0,0 +1,25 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+/**
+ * JDBC based database stream connector.
+ * <p>
+ * Stream tuples may be written to databases
+ * and created or enriched by reading from databases.
+ */
+package org.apache.edgent.connectors.jdbc;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/runtime/JdbcConnector.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/runtime/JdbcConnector.java b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/runtime/JdbcConnector.java
new file mode 100644
index 0000000..93c8287
--- /dev/null
+++ b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/runtime/JdbcConnector.java
@@ -0,0 +1,101 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.jdbc.runtime;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.SQLTransientException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.sql.DataSource;
+
+import org.apache.edgent.connectors.jdbc.CheckedFunction;
+import org.apache.edgent.connectors.jdbc.CheckedSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JdbcConnector {
+
+ private static final Logger logger = LoggerFactory.getLogger(JdbcConnector.class);
+ private final CheckedSupplier<DataSource> dataSourceFn;
+ private final CheckedFunction<DataSource,Connection> connFn;
+ private DataSource ds;
+ private final Map<JdbcStatement<?,?>,Connection> cnMap = new HashMap<>();
+
+ public JdbcConnector(CheckedSupplier<DataSource> dataSourceFn, CheckedFunction<DataSource,Connection> connFn) {
+ this.dataSourceFn = dataSourceFn;
+ this.connFn = connFn;
+ }
+
+ Logger getLogger() {
+ return logger;
+ }
+
+ void unregister(JdbcStatement<?,?> oplet) {
+ logger.trace("unregistering statement");
+ closeCn(oplet);
+ }
+
+ private DataSource getDataSource() throws Exception {
+ if (ds == null) {
+ logger.trace("getting DataSource");
+ ds = dataSourceFn.get();
+ }
+ return ds;
+ }
+
+ synchronized Connection getConnection(JdbcStatement<?,?> oplet) throws Exception {
+ // Apparently a bad idea for multiple threads (operators
+ // in our case) to use a single Connection instance.
+ Connection cn = cnMap.get(oplet);
+ if (cn == null) {
+ try {
+ logger.trace("getting jdbc connection");
+ cn = connFn.apply(getDataSource());
+ cnMap.put(oplet, cn);
+ }
+ catch (Exception e) {
+ logger.error("unable to connect", e);
+ throw e;
+ }
+ }
+ return cn;
+ }
+
+ void statementFailed(JdbcStatement<?,?> oplet, Exception e) {
+ logger.error("statement failed", e);
+ if (!(e instanceof SQLTransientException)) {
+ closeCn(oplet);
+ }
+ }
+
+ private synchronized void closeCn(JdbcStatement<?,?> oplet) {
+ try {
+ Connection cn = cnMap.remove(oplet);
+ if (cn != null) {
+ logger.trace("closing jdbc connection");
+ cn.close();
+ }
+ }
+ catch (SQLException e) {
+ logger.error("jdbc close cn failed", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/runtime/JdbcStatement.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/runtime/JdbcStatement.java b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/runtime/JdbcStatement.java
new file mode 100644
index 0000000..7c557ec
--- /dev/null
+++ b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/runtime/JdbcStatement.java
@@ -0,0 +1,141 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.jdbc.runtime;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.edgent.connectors.jdbc.ParameterSetter;
+import org.apache.edgent.connectors.jdbc.ResultsHandler;
+import org.apache.edgent.connectors.jdbc.StatementSupplier;
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Function;
+import org.slf4j.Logger;
+
+public class JdbcStatement<T,R> implements Function<T,Iterable<R>>,Consumer<T>,AutoCloseable {
+ private static final long serialVersionUID = 1L;
+ private final Logger logger;
+ private final JdbcConnector connector;
+ private final StatementSupplier stmtSupplier;
+ private final ParameterSetter<T> paramSetter;
+ private final ResultsHandler<T,R> resultsHandler;
+ private PreparedStatement stmt;
+ private long nTuples;
+ private long nTuplesFailed;
+
+ private void closeStmt() {
+ if (stmt != null) {
+ logger.trace("closing statement");
+ PreparedStatement tmp = stmt;
+ stmt = null;
+ try {
+ tmp.close();
+ }
+ catch (SQLException e) {
+ logger.error("close stmt failed", e);
+ }
+ }
+ }
+
+ public JdbcStatement(JdbcConnector connector,
+ StatementSupplier stmtSupplier, ParameterSetter<T> paramSetter,
+ ResultsHandler<T,R> resultsHandler) {
+ this.logger = connector.getLogger();
+ this.connector = connector;
+ this.stmtSupplier = stmtSupplier;
+ this.paramSetter = paramSetter;
+ this.resultsHandler = resultsHandler;
+ }
+
+ public JdbcStatement(JdbcConnector connector,
+ StatementSupplier stmtSupplier, ParameterSetter<T> paramSetter) {
+ this(connector, stmtSupplier, paramSetter, null);
+ }
+
+ @Override
+ public void accept(T tuple) {
+ executeStatement(tuple, null);
+ }
+
+ @Override
+ public Iterable<R> apply(T tuple) {
+ // lame impl for large result sets but will do for now.
+ List<R> results = new ArrayList<>();
+ executeStatement(tuple, results);
+ return results;
+ }
+
+ private void executeStatement(T tuple, List<R> results) {
+ nTuples++;
+ try {
+ logger.debug("executing statement nTuples={} nTuplesFailed={}", nTuples, nTuplesFailed);
+ Connection cn = connector.getConnection(this);
+ PreparedStatement stmt = getPreparedStatement(cn);
+ paramSetter.setParameters(tuple, stmt);
+ boolean hasResult = stmt.execute();
+ if (resultsHandler != null) {
+ if (!hasResult) {
+ resultsHandler.handleResults(tuple, null/*rs*/, null/*exc*/,
+ (result) -> results.add(result));
+ }
+ else {
+ do {
+ try (ResultSet rs = stmt.getResultSet()) {
+ resultsHandler.handleResults(tuple, rs, null/*exc*/,
+ (result) -> results.add(result));
+ }
+ } while (stmt.getMoreResults());
+ }
+ }
+ }
+ catch (Exception e) {
+ nTuplesFailed++;
+ logger.trace("executing statement failed nTuples={} nTuplesFailed={}", nTuples, nTuplesFailed);
+ if (resultsHandler != null) {
+ try {
+ resultsHandler.handleResults(tuple, null/*rs*/, e,
+ (result) -> results.add(result));
+ }
+ catch (Exception e2) {
+ logger.error("failure result handler failed", e2);
+ }
+ }
+ closeStmt();
+ connector.statementFailed(this, e);
+ }
+ }
+
+ private PreparedStatement getPreparedStatement(Connection cn) throws SQLException {
+ if (stmt == null) {
+ stmt = stmtSupplier.get(cn);
+ }
+ return stmt;
+ }
+
+ @Override
+ public void close() throws Exception {
+ closeStmt();
+ connector.unregister(this);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/test/java/edgent/test/connectors/jdbc/JdbcStreamsGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/test/java/edgent/test/connectors/jdbc/JdbcStreamsGlobalTest.java b/connectors/jdbc/src/test/java/edgent/test/connectors/jdbc/JdbcStreamsGlobalTest.java
deleted file mode 100644
index ba2dfa7..0000000
--- a/connectors/jdbc/src/test/java/edgent/test/connectors/jdbc/JdbcStreamsGlobalTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.connectors.jdbc;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * JdbcStreams connector globalization tests.
- * <p>
- * The tests use Apache Embedded Derby as the backing dbms.
- * The Oracle JDK includes Derby in $JAVA_HOME/db.
- * Manually install Derby for other JDKs if required.
- * Arrange for the classpath to be configured by one of:
- * <ul>
- * <li>set the DERBY_HOME environment variable. build.xml adds
- * $DERBY_HOME/lib/derby.jar to the classpath when running the tests.</li>
- * <li>manually add derby.jar to the classpath</li>
- * </ul>
- * The tests are "skipped" if the dbms's jdbc driver can't be found.
- */
-public class JdbcStreamsGlobalTest extends JdbcStreamsTest {
-
- private static final List<Person> globalPersonList = new ArrayList<>();
- static {
- globalPersonList.add(new Person(1, "\u7ea6\u7ff0", "\u674e", "male", 35));
- globalPersonList.add(new Person(2, "\u7b80", "\u674e", "female", 29));
- globalPersonList.add(new Person(3, "\u6bd4\u5229", "\u5468", "male", 3));
- }
- private static final List<PersonId> globalPersonIdList = new ArrayList<>();
- static {
- for(Person p : globalPersonList) {
- globalPersonIdList.add(new PersonId(p.id));
- }
- }
-
- public List<Person> getPersonList() {
- return globalPersonList;
- }
-
- public List<PersonId> getPersonIdList() {
- return globalPersonIdList;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/test/java/edgent/test/connectors/jdbc/JdbcStreamsTest.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/test/java/edgent/test/connectors/jdbc/JdbcStreamsTest.java b/connectors/jdbc/src/test/java/edgent/test/connectors/jdbc/JdbcStreamsTest.java
deleted file mode 100644
index b24c7e7..0000000
--- a/connectors/jdbc/src/test/java/edgent/test/connectors/jdbc/JdbcStreamsTest.java
+++ /dev/null
@@ -1,635 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.connectors.jdbc;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assume.assumeTrue;
-
-import java.lang.reflect.Method;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-
-import javax.sql.DataSource;
-
-import org.junit.Test;
-
-import edgent.connectors.jdbc.JdbcStreams;
-import edgent.test.connectors.common.ConnectorTestBase;
-import edgent.topology.TSink;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.plumbing.PlumbingStreams;
-
-/**
- * JdbcStreams connector tests.
- * <p>
- * The tests use Apache Embedded Derby as the backing dbms.
- * The Oracle JDK includes Derby in $JAVA_HOME/db.
- * Manually install Derby for other JDKs if required.
- * Arrange for the classpath to be configured by one of:
- * <ul>
- * <li>set the DERBY_HOME environment variable. build.xml adds
- * $DERBY_HOME/lib/derby.jar to the classpath when running the tests.</li>
- * <li>manually add derby.jar to the classpath</li>
- * </ul>
- * The tests are "skipped" if the dbms's jdbc driver can't be found.
- */
-public class JdbcStreamsTest extends ConnectorTestBase {
-
- private static final int SEC_TIMEOUT = 10;
- private final static String DB_NAME = "JdbcStreamsTestDb";
- private final static String USERNAME = System.getProperty("user.name");
- private final static String PW = "none";
- private static final List<Person> personList = new ArrayList<>();
- static {
- personList.add(new Person(1, "John", "Doe", "male", 35));
- personList.add(new Person(2, "Jane", "Doe", "female", 29));
- personList.add(new Person(3, "Billy", "McDoe", "male", 3));
- }
- private static final List<PersonId> personIdList = new ArrayList<>();
- static {
- for(Person p : personList) {
- personIdList.add(new PersonId(p.id));
- }
- }
-
- static class Person {
- int id;
- String firstName;
- String lastName;
- String gender;
- int age;
- Person(int id, String first, String last, String gender, int age) {
- this.id = id;
- this.firstName = first;
- this.lastName = last;
- this.gender = gender;
- this.age = age;
- }
- public String toString() {
- return String.format("id=%d first=%s last=%s gender=%s age=%d",
- id, firstName, lastName, gender, age);
- }
- }
-
- static class PersonId {
- int id;
- PersonId(int id) {
- this.id = id;
- }
- public String toString() {
- return String.format("id=%d", id);
- }
- }
-
- public List<Person> getPersonList() {
- return personList;
- }
-
- public List<PersonId> getPersonIdList() {
- return personIdList;
- }
-
- DataSource getDerbyEmbeddedDataSource(String database) throws Exception
- {
- // Avoid a compile-time dependency to the jdbc driver.
- // At runtime, require that the classpath can find it.
- // e.g., build.xml adds $DERBY_HOME/lib/derby.jar to the test classpath
-
- String DERBY_DATA_SOURCE = "org.apache.derby.jdbc.EmbeddedDataSource";
-
- Class<?> nsDataSource = null;
- try {
- nsDataSource = Class.forName(DERBY_DATA_SOURCE);
- }
- catch (ClassNotFoundException e) {
- String msg = "Fix the test classpath. ";
- if (System.getenv("DERBY_HOME") == null) {
- msg += "DERBY_HOME not set. ";
- }
- msg += "Class not found: "+e.getLocalizedMessage();
- System.err.println(msg);
- assumeTrue(false);
- }
- DataSource ds = (DataSource) nsDataSource.newInstance();
-
- @SuppressWarnings("rawtypes")
- Class[] methodParams = new Class[] {String.class};
- Method dbname = nsDataSource.getMethod("setDatabaseName", methodParams);
- Object[] args = new Object[] {database};
- dbname.invoke(ds, args);
-
- // create the db if necessary
- Method create = nsDataSource.getMethod("setCreateDatabase", methodParams);
- args = new Object[] {"create"};
- create.invoke(ds, args);
-
- return ds;
- }
-
- private DataSource getDataSource(String logicalDbName) throws Exception {
- return getDerbyEmbeddedDataSource(logicalDbName);
- }
-
- private Connection connect(DataSource ds) throws Exception {
- return ds.getConnection(USERNAME, PW);
- }
-
- private void createPersonsTable() throws Exception {
- DataSource ds = getDataSource(DB_NAME);
- try(Connection cn = connect(ds)) {
- Statement stmt = cn.createStatement();
- try {
- stmt.execute("CREATE TABLE persons "
- + "("
- + "id INTEGER NOT NULL,"
- + "firstname VARCHAR(40) NOT NULL,"
- + "lastname VARCHAR(40) NOT NULL,"
- + "gender VARCHAR(6),"
- + "age INTEGER,"
- + "PRIMARY KEY (id)"
- + ")"
- );
- }
- catch (SQLException e) {
- if (e.getLocalizedMessage().contains("already exists"))
- return;
- else
- throw e;
- }
- }
- }
-
- private void truncatePersonsTable() throws Exception {
- createPersonsTable();
- DataSource ds = getDataSource(DB_NAME);
- try(Connection cn = connect(ds)) {
- Statement stmt = cn.createStatement();
- stmt.executeUpdate("DELETE FROM persons");
- }
- }
-
- private void populatePersonsTable(List<Person> personList) throws Exception {
- truncatePersonsTable();
- DataSource ds = getDataSource(DB_NAME);
- try(Connection cn = connect(ds)) {
- Statement stmt = cn.createStatement();
- for(Person p : personList) {
- stmt.execute(String.format(
- "INSERT INTO persons VALUES(%d,'%s','%s','%s',%d)",
- p.id, p.firstName, p.lastName, p.gender, p.age));
- }
- }
- }
-
- private TStream<Person> readPersonsTable(Topology t, JdbcStreams db, List<PersonId> personIdList, int delayMsec) {
- // Create a stream of Person from a stream of ids
- TStream<PersonId> personIds = t.collection(personIdList);
- if (delayMsec!=0) {
- personIds = PlumbingStreams.blockingOneShotDelay(personIds,
- delayMsec, TimeUnit.MILLISECONDS);
- }
- TStream<Person> rcvdPerson = db.executeStatement(personIds,
- () -> "SELECT id, firstname, lastname, gender, age"
- + " FROM persons WHERE id = ?",
- (tuple,stmt) -> stmt.setInt(1, tuple.id),
- (tuple,resultSet,exc,stream) -> {
- resultSet.next();
- int id = resultSet.getInt("id");
- String firstName = resultSet.getString("firstname");
- String lastName = resultSet.getString("lastname");
- String gender = resultSet.getString("gender");
- int age = resultSet.getInt("age");
- stream.accept(new Person(id, firstName, lastName, gender, age));
- }
- );
- return rcvdPerson;
- }
-
- private static java.util.function.Predicate<Person> newOddIdPredicate() {
- return (person) -> person.id % 2 != 0;
- }
-
- private List<String> expectedPersons(java.util.function.Predicate<Person> predicate, List<Person> persons) {
- return persons.stream()
- .filter(predicate)
- .map(person -> person.toString())
- .collect(Collectors.toList());
- }
-
- @Test
- public void testBasicRead() throws Exception {
- Topology t = this.newTopology("testBasicRead");
-
- populatePersonsTable(getPersonList());
- List<String> expected = expectedPersons(person->true, getPersonList());
-
- JdbcStreams db = new JdbcStreams(t,
- () -> getDataSource(DB_NAME),
- dataSource -> connect(dataSource));
-
- // Create a stream of Person from a stream of ids
- TStream<Person> rcvdPerson = readPersonsTable(t, db, getPersonIdList(), 0/*msec*/);
- TStream<String> rcvd = rcvdPerson.map(person -> person.toString());
-
- rcvd.sink(tuple -> System.out.println(
- String.format("%s rcvd: %s", t.getName(), tuple)));
- completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
- }
-
- @Test
- public void testBasicRead2() throws Exception {
- Topology t = newTopology("testBasicRead2");
- // same as testBasic but use the explicit PreparedStatement forms
- // of executeStatement().
-
- populatePersonsTable(getPersonList());
- List<String> expected = expectedPersons(person->true, getPersonList());
-
- JdbcStreams db = new JdbcStreams(t,
- () -> getDataSource(DB_NAME),
- dataSource -> connect(dataSource));
-
- // Create a stream of Person from a stream of ids
- // Delay so this runs after populating the db above
- TStream<PersonId> personIds = PlumbingStreams.blockingOneShotDelay(
- t.collection(getPersonIdList()), 3, TimeUnit.SECONDS);
- TStream<Person> rcvdPerson = db.executeStatement(personIds,
- (cn) -> cn.prepareStatement("SELECT id, firstname, lastname, gender, age"
- + " FROM persons WHERE id = ?"),
- (tuple,stmt) -> stmt.setInt(1, tuple.id),
- (tuple,resultSet,exc,stream) -> {
- resultSet.next();
- int id = resultSet.getInt("id");
- String firstName = resultSet.getString("firstname");
- String lastName = resultSet.getString("lastname");
- String gender = resultSet.getString("gender");
- int age = resultSet.getInt("age");
- stream.accept(new Person(id, firstName, lastName, gender, age));
- }
- );
- TStream<String> rcvd = rcvdPerson.map(person -> person.toString());
-
- rcvd.sink(tuple -> System.out.println(
- String.format("%s rcvd: %s", t.getName(), tuple)));
- completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
- }
-
- @Test
- public void testBasicWrite() throws Exception {
- Topology t = newTopology("testBasicWrite");
-
- truncatePersonsTable();
- List<String> expected = expectedPersons(person->true, getPersonList());
-
- JdbcStreams db = new JdbcStreams(t,
- () -> getDataSource(DB_NAME),
- dataSource -> connect(dataSource));
-
- // Add stream of Person to the db
- TStream<Person> s = t.collection(getPersonList());
- TSink<Person> sink = db.executeStatement(s,
- () -> "INSERT INTO persons VALUES(?,?,?,?,?)",
- (tuple,stmt) -> {
- stmt.setInt(1, tuple.id);
- stmt.setString(2, tuple.firstName);
- stmt.setString(3, tuple.lastName);
- stmt.setString(4, tuple.gender);
- stmt.setInt(5, tuple.age);
- }
- );
- assertNotNull(sink);
-
- // Use the same code as testBasicRead to verify the write worked.
- TStream<Person> rcvdPerson = readPersonsTable(t, db, getPersonIdList(), 3000/*msec*/);
- TStream<String> rcvd = rcvdPerson.map(person -> person.toString());
-
- rcvd.sink(tuple -> System.out.println(
- String.format("%s rcvd: %s", t.getName(), tuple)));
- completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
- }
-
- @Test
- public void testBasicWrite2() throws Exception {
- Topology t = newTopology("testBasicWrite2");
- // same as testBasic but use the explicit PreparedStatement forms
- // of executeStatement().
-
- truncatePersonsTable();
- List<String> expected = expectedPersons(person->true, getPersonList());
-
- JdbcStreams db = new JdbcStreams(t,
- () -> getDataSource(DB_NAME),
- dataSource -> connect(dataSource));
-
- // Add stream of Person to the db
- TStream<Person> s = t.collection(getPersonList());
- TSink<Person> sink = db.executeStatement(s,
- (cn) -> cn.prepareStatement("INSERT into PERSONS values(?,?,?,?,?)"),
- (tuple,stmt) -> {
- stmt.setInt(1, tuple.id);
- stmt.setString(2, tuple.firstName);
- stmt.setString(3, tuple.lastName);
- stmt.setString(4, tuple.gender);
- stmt.setInt(5, tuple.age);
- }
- );
-
- assertNotNull(sink);
-
- // Use the same code as testBasicRead to verify the write worked.
- TStream<Person> rcvdPerson = readPersonsTable(t, db, getPersonIdList(), 3000/*msec*/);
- TStream<String> rcvd = rcvdPerson.map(person -> person.toString());
-
- rcvd.sink(tuple -> System.out.println(
- String.format("%s rcvd: %s", t.getName(), tuple)));
- completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
- }
-
- @Test
- public void testBadConnectFn() throws Exception {
- Topology t = newTopology("testBadConnectFn");
- // connFn is only called for initial connect or reconnect
- // following certain failures.
- // Hence, to exercise transient connFn failures, we need to start
- // off with a failure.
-
- // TODO for transient connection failure cases, simulate a failure
- // as part of preparedStatement.execute() failing (e.g., force cn-close
- // right before it?), so that we can verify the conn is closed and
- // then reconnected
-
- populatePersonsTable(getPersonList());
- List<String> expected = expectedPersons(p->true, getPersonList().subList(1, getPersonList().size()));
- int expectedExcCnt = getPersonList().size() - expected.size();
-
- AtomicInteger connFnCnt = new AtomicInteger();
- JdbcStreams db = new JdbcStreams(t,
- () -> getDataSource(DB_NAME),
- dataSource -> {
- if (connFnCnt.incrementAndGet() == 1)
- throw new SQLException("FAKE-CONNECT-FN-FAILURE");
- else
- return connect(dataSource);
- });
-
- // Create a stream of Person from a stream of ids
- AtomicInteger executionExcCnt = new AtomicInteger();
- TStream<PersonId> personIds = t.collection(getPersonIdList());
- TStream<Person> rcvdPerson = db.executeStatement(personIds,
- () -> "SELECT id, firstname, lastname, gender, age"
- + " FROM persons WHERE id = ?",
- (tuple,stmt) -> stmt.setInt(1, tuple.id),
- (tuple,resultSet,exc,stream) -> {
- System.out.println(t.getName()+" resultHandler called tuple="+tuple+" exc="+exc);
- if (exc!=null) {
- executionExcCnt.incrementAndGet();
- return;
- }
- resultSet.next();
- int id = resultSet.getInt("id");
- String firstName = resultSet.getString("firstname");
- String lastName = resultSet.getString("lastname");
- String gender = resultSet.getString("gender");
- int age = resultSet.getInt("age");
- stream.accept(new Person(id, firstName, lastName, gender, age));
- }
- );
- TStream<String> rcvd = rcvdPerson.map(person -> person.toString());
-
- rcvd.sink(tuple -> System.out.println(
- String.format("%s rcvd: %s", t.getName(), tuple)));
- completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
- assertEquals("executionExcCnt", expectedExcCnt, executionExcCnt.get());
- }
-
- @Test
- public void testBadSQL() throws Exception {
- Topology t = newTopology("testBadSQL");
- // the statement is nominally "retrieved" only once, not per-tuple.
- // hence, there's not much sense in trying to simulate it
- // getting called unsuccessfully, then successfully, etc.
- // however, verify the result handler gets called appropriately.
-
- populatePersonsTable(getPersonList());
- List<String> expected = Collections.emptyList();
- int expectedExcCnt = getPersonList().size() - expected.size();
-
- JdbcStreams db = new JdbcStreams(t,
- () -> getDataSource(DB_NAME),
- dataSource -> connect(dataSource));
-
- // Create a stream of Person from a stream of ids
- AtomicInteger executionExcCnt = new AtomicInteger();
- TStream<PersonId> personIds = t.collection(getPersonIdList());
- TStream<Person> rcvdPerson = db.executeStatement(personIds,
- () -> "SELECT id, firstname, lastname, gender, age"
- + " FROM persons WHERE BOGUS_XYZZY id = ?",
- (tuple,stmt) -> stmt.setInt(1, tuple.id),
- (tuple,resultSet,exc,stream) -> {
- System.out.println(t.getName()+" resultHandler called tuple="+tuple+" exc="+exc);
- if (exc!=null) {
- executionExcCnt.incrementAndGet();
- return;
- }
- resultSet.next();
- int id = resultSet.getInt("id");
- String firstName = resultSet.getString("firstname");
- String lastName = resultSet.getString("lastname");
- String gender = resultSet.getString("gender");
- int age = resultSet.getInt("age");
- stream.accept(new Person(id, firstName, lastName, gender, age));
- }
- );
- TStream<String> rcvd = rcvdPerson.map(person -> person.toString());
-
- rcvd.sink(tuple -> System.out.println(
- String.format("%s rcvd: %s", t.getName(), tuple)));
- completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
- assertEquals("executionExcCnt", expectedExcCnt, executionExcCnt.get());
- }
-
- @Test
- public void testBadSetParams() throws Exception {
- Topology t = newTopology("testBadSetParams");
- // exercise and validate behavior with transient parameter setter failures
-
- populatePersonsTable(getPersonList());
- List<String> expected = expectedPersons(newOddIdPredicate(), getPersonList());
- int expectedExcCnt = getPersonList().size() - expected.size();
-
- JdbcStreams db = new JdbcStreams(t,
- () -> getDataSource(DB_NAME),
- dataSource -> connect(dataSource));
-
- // Create a stream of Person from a stream of ids
- AtomicInteger executionExcCnt = new AtomicInteger();
- TStream<PersonId> personIds = t.collection(getPersonIdList());
- TStream<Person> rcvdPerson = db.executeStatement(personIds,
- () -> "SELECT id, firstname, lastname, gender, age"
- + " FROM persons WHERE id = ?",
- (tuple,stmt) -> { if (tuple.id % 2 != 0)
- stmt.setInt(1, tuple.id);
- else
- stmt.setString(1, "THIS-IS-BOGUS"); },
- (tuple,resultSet,exc,stream) -> {
- System.out.println(t.getName()+" resultHandler called tuple="+tuple+" exc="+exc);
- if (exc!=null) {
- executionExcCnt.incrementAndGet();
- return;
- }
- resultSet.next();
- int id = resultSet.getInt("id");
- String firstName = resultSet.getString("firstname");
- String lastName = resultSet.getString("lastname");
- String gender = resultSet.getString("gender");
- int age = resultSet.getInt("age");
- stream.accept(new Person(id, firstName, lastName, gender, age));
- }
- );
- TStream<String> rcvd = rcvdPerson.map(person -> person.toString());
-
- rcvd.sink(tuple -> System.out.println(
- String.format("%s rcvd: %s", t.getName(), tuple)));
- completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
- assertEquals("executionExcCnt", expectedExcCnt, executionExcCnt.get());
- }
-
- @Test
- public void testBadResultHandler() throws Exception {
- Topology t = newTopology("testBadResultHandler");
- // exercise and validate behavior with transient result handler failures
-
- populatePersonsTable(getPersonList());
- List<String> expected = expectedPersons(newOddIdPredicate(), getPersonList());
- int expectedExcCnt = getPersonList().size() - expected.size();
-
- JdbcStreams db = new JdbcStreams(t,
- () -> getDataSource(DB_NAME),
- dataSource -> connect(dataSource));
-
- // Create a stream of Person from a stream of ids
- AtomicInteger executionExcCnt = new AtomicInteger();
- TStream<PersonId> personIds = t.collection(getPersonIdList());
- TStream<Person> rcvdPerson = db.executeStatement(personIds,
- () -> "SELECT id, firstname, lastname, gender, age"
- + " FROM persons WHERE id = ?",
- (tuple,stmt) -> stmt.setInt(1, tuple.id),
- (tuple,resultSet,exc,stream) -> {
- System.out.println(t.getName()+" resultHandler called tuple="+tuple+" exc="+exc);
- if (exc!=null) {
- executionExcCnt.incrementAndGet();
- return;
- }
- resultSet.next();
- int id = resultSet.getInt(tuple.id % 2 == 0
- ? "ID-THIS-IS-BOGUS" : "id");
- String firstName = resultSet.getString("firstname");
- String lastName = resultSet.getString("lastname");
- String gender = resultSet.getString("gender");
- int age = resultSet.getInt("age");
- stream.accept(new Person(id, firstName, lastName, gender, age));
- }
- );
- TStream<String> rcvd = rcvdPerson.map(person -> person.toString());
-
- rcvd.sink(tuple -> System.out.println(
- String.format("%s rcvd: %s", t.getName(), tuple)));
- completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
- assertEquals("executionExcCnt", expectedExcCnt, executionExcCnt.get());
- }
-
-
- @Test
- public void testNonResultSetStmt() throws Exception {
- Topology t = newTopology("testNonResultSetStmt");
- // exercise and validate use of non-ResultSet SQL statement
- // wrt proper resultHandler behavior - e.g., receive exception,
- // can generate tuples
-
- List<String> expected = Arrays.asList("once");
-
- // throw if can't create DataSource - e.g., can't locate derby
- getDataSource(DB_NAME);
-
- JdbcStreams db = new JdbcStreams(t,
- () -> getDataSource(DB_NAME),
- dataSource -> connect(dataSource));
-
- // Add stream of Person to the db
- TStream<String> trigger = t.collection(expected);
- TStream<String> dropTableStep = db.executeStatement(trigger,
- () -> "DROP TABLE swill",
- (tuple,stmt) -> { /* no params */ },
- (tuple,rs,exc,consumer) -> {
- // ok if fails
- System.out.println(t.getName()+" resultHandler drop table exc="+exc);
- if (rs!=null)
- throw new IllegalStateException("rs!=null");
- consumer.accept(tuple);
- }
- );
- TStream<String> createTableStep = db.executeStatement(dropTableStep,
- () -> "CREATE TABLE swill (id INTEGER NOT NULL)",
- (tuple,stmt) -> { /* no params */ },
- (tuple,rs,exc,consumer) -> {
- System.out.println(t.getName()+" resultHandler create table exc="+exc);
- if (rs!=null)
- throw new IllegalStateException("rs!=null");
- consumer.accept(tuple);
- }
- );
- TStream<String> failDropTable = db.executeStatement(createTableStep,
- () -> "DROP TABLE no_such_table",
- (tuple,stmt) -> { /* no params */ },
- (tuple,rs,exc,consumer) -> {
- System.out.println(t.getName()+" resultHandler fail drop table exc="+exc);
- if (exc==null)
- throw new IllegalStateException("exc==null");
- if (rs!=null)
- throw new IllegalStateException("rs!=null");
- consumer.accept(tuple);
- }
- );
- TStream<String> selectStep = db.executeStatement(failDropTable,
- () -> "SELECT * FROM swill",
- (tuple,stmt) -> { /* no params */ },
- (tuple,rs,exc,consumer) -> {
- System.out.println(t.getName()+" resultHandler select exc="+exc);
- if (rs==null)
- throw new IllegalStateException("rs==null");
- consumer.accept(tuple);
- }
- );
- TStream<String> rcvd = selectStep;
-
- completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/test/java/org/apache/edgent/test/connectors/jdbc/JdbcStreamsGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/test/java/org/apache/edgent/test/connectors/jdbc/JdbcStreamsGlobalTest.java b/connectors/jdbc/src/test/java/org/apache/edgent/test/connectors/jdbc/JdbcStreamsGlobalTest.java
new file mode 100644
index 0000000..4f1cacb
--- /dev/null
+++ b/connectors/jdbc/src/test/java/org/apache/edgent/test/connectors/jdbc/JdbcStreamsGlobalTest.java
@@ -0,0 +1,61 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.connectors.jdbc;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * JdbcStreams connector globalization tests.
+ * <p>
+ * The tests use Apache Embedded Derby as the backing dbms.
+ * The Oracle JDK includes Derby in $JAVA_HOME/db.
+ * Manually install Derby for other JDKs if required.
+ * Arrange for the classpath to be configured by one of:
+ * <ul>
+ * <li>set the DERBY_HOME environment variable. build.xml adds
+ * $DERBY_HOME/lib/derby.jar to the classpath when running the tests.</li>
+ * <li>manually add derby.jar to the classpath</li>
+ * </ul>
+ * The tests are "skipped" if the dbms's jdbc driver can't be found.
+ */
+public class JdbcStreamsGlobalTest extends JdbcStreamsTest {
+
+ private static final List<Person> globalPersonList = new ArrayList<>();
+ static {
+ globalPersonList.add(new Person(1, "\u7ea6\u7ff0", "\u674e", "male", 35));
+ globalPersonList.add(new Person(2, "\u7b80", "\u674e", "female", 29));
+ globalPersonList.add(new Person(3, "\u6bd4\u5229", "\u5468", "male", 3));
+ }
+ private static final List<PersonId> globalPersonIdList = new ArrayList<>();
+ static {
+ for(Person p : globalPersonList) {
+ globalPersonIdList.add(new PersonId(p.id));
+ }
+ }
+
+ public List<Person> getPersonList() {
+ return globalPersonList;
+ }
+
+ public List<PersonId> getPersonIdList() {
+ return globalPersonIdList;
+ }
+
+}