You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:26 UTC

[25/54] [abbrv] [partial] incubator-quarks git commit: add "org.apache." prefix to edgent package names

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/main/java/edgent/connectors/jdbc/ResultsHandler.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/main/java/edgent/connectors/jdbc/ResultsHandler.java b/connectors/jdbc/src/main/java/edgent/connectors/jdbc/ResultsHandler.java
deleted file mode 100644
index 1b5f74c..0000000
--- a/connectors/jdbc/src/main/java/edgent/connectors/jdbc/ResultsHandler.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.jdbc;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-import edgent.function.Consumer;
-
-/**
- * Handle the results of executing an SQL statement.
- * <p>
- * Sample use:
- * <br>
- * For a ResultSet created by executing the SQL statement:
- * <br>
- * {@code "SELECT id, firstname, lastname FROM persons WHERE id = ?"}
- * <pre>{@code 
- * // create a Person tuple from db person info and add it to a stream
- * ResultsHandler<PersonId,Person> rh = 
- *     (tuple,rs,exc,consumer) -> {
- *         if (exc != null)
- *             return;
- *         rs.next();
- *         int id = rs.getInt("id");
- *         String firstName = rs.getString("firstname");
- *         String lastName = rs.getString("lastname");
- *         consumer.accept(new Person(id, firstName, lastName));
- *         }
- *      }
- *    };
- * }</pre>
- *
- * @param <T> type of the tuple inducing the SQL statement execution / results
- * @param <R> type of tuple of a result stream consumer
- */
-@FunctionalInterface
-public interface ResultsHandler<T,R> {
-    /**
-     * Process the {@code ResultSet} and add 0 or more tuples to {@code consumer}.
-     * @param tuple the tuple that induced the resultSet
-     * @param resultSet the SQL statement's result set. null if {@code exc}
-     *        is non-null or if the statement doesn't generate a {@code ResultSet}.
-     * @param exc non-null if there was an exception executing the statement.
-     *        Typically a SQLException.
-     * @param consumer a Consumer to a result stream.
-     * @throws SQLException if there are problems handling the result
-     */
-    public void handleResults(T tuple, ResultSet resultSet, Exception exc, Consumer<R> consumer) throws SQLException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/main/java/edgent/connectors/jdbc/StatementSupplier.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/main/java/edgent/connectors/jdbc/StatementSupplier.java b/connectors/jdbc/src/main/java/edgent/connectors/jdbc/StatementSupplier.java
deleted file mode 100644
index 2c00791..0000000
--- a/connectors/jdbc/src/main/java/edgent/connectors/jdbc/StatementSupplier.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.jdbc;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
-/**
- * Function that supplies a JDBC SQL {@link java.sql.PreparedStatement}.
- */
-@FunctionalInterface
-public interface StatementSupplier {
-    /**
-     * Create a JDBC SQL PreparedStatement containing 0 or more parameters.
-     * <p>
-     * Sample use:
-     * <pre>{@code
-     * StatementSupplier ss = 
-     *     (cn) -> cn.prepareStatement("SELECT id, firstname, lastname"
-     *                                  + " FROM persons WHERE id = ?");
-     * }</pre>
-     * @param cn JDBC connection
-     * @return the PreparedStatement
-     * @throws SQLException on failure
-     */
-    PreparedStatement get(Connection cn) throws SQLException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/main/java/edgent/connectors/jdbc/package-info.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/main/java/edgent/connectors/jdbc/package-info.java b/connectors/jdbc/src/main/java/edgent/connectors/jdbc/package-info.java
deleted file mode 100644
index 6320fb2..0000000
--- a/connectors/jdbc/src/main/java/edgent/connectors/jdbc/package-info.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-/**
- * JDBC based database stream connector.
- * <p>
- * Stream tuples may be written to databases
- * and created or enriched by reading from databases.
- */
-package edgent.connectors.jdbc;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/main/java/edgent/connectors/jdbc/runtime/JdbcConnector.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/main/java/edgent/connectors/jdbc/runtime/JdbcConnector.java b/connectors/jdbc/src/main/java/edgent/connectors/jdbc/runtime/JdbcConnector.java
deleted file mode 100644
index 946bc8a..0000000
--- a/connectors/jdbc/src/main/java/edgent/connectors/jdbc/runtime/JdbcConnector.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.jdbc.runtime;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.SQLTransientException;
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.sql.DataSource;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import edgent.connectors.jdbc.CheckedFunction;
-import edgent.connectors.jdbc.CheckedSupplier;
-
-public class JdbcConnector {
-    
-    private static final Logger logger = LoggerFactory.getLogger(JdbcConnector.class);
-    private final CheckedSupplier<DataSource> dataSourceFn;
-    private final CheckedFunction<DataSource,Connection> connFn;
-    private DataSource ds;
-    private final Map<JdbcStatement<?,?>,Connection> cnMap = new HashMap<>();
-    
-    public JdbcConnector(CheckedSupplier<DataSource> dataSourceFn, CheckedFunction<DataSource,Connection> connFn) {
-        this.dataSourceFn = dataSourceFn;
-        this.connFn = connFn;
-    }
-    
-    Logger getLogger() {
-        return logger;
-    }
-    
-    void unregister(JdbcStatement<?,?> oplet) {
-        logger.trace("unregistering statement");
-        closeCn(oplet);
-    }
-    
-    private DataSource getDataSource() throws Exception {
-        if (ds == null) {
-            logger.trace("getting DataSource");
-            ds = dataSourceFn.get();
-        }
-        return ds;
-    }
-    
-    synchronized Connection getConnection(JdbcStatement<?,?> oplet) throws Exception {
-        // Apparently a bad idea for multiple threads (operators
-        // in our case) to use a single Connection instance.
-        Connection cn = cnMap.get(oplet);
-        if (cn == null) {
-            try {
-                logger.trace("getting jdbc connection");
-                cn = connFn.apply(getDataSource());
-                cnMap.put(oplet, cn);
-            }
-            catch (Exception e) {
-                logger.error("unable to connect", e);
-                throw e;
-            }
-        }
-        return cn;
-    }
-
-    void statementFailed(JdbcStatement<?,?> oplet, Exception e) {
-        logger.error("statement failed", e);
-        if (!(e instanceof SQLTransientException)) {
-            closeCn(oplet);
-        }
-    }
-    
-    private synchronized void closeCn(JdbcStatement<?,?> oplet) {
-        try {
-            Connection cn = cnMap.remove(oplet);
-            if (cn != null) {
-                logger.trace("closing jdbc connection");
-                cn.close();
-            }
-        }
-        catch (SQLException e) {
-            logger.error("jdbc close cn failed", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/main/java/edgent/connectors/jdbc/runtime/JdbcStatement.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/main/java/edgent/connectors/jdbc/runtime/JdbcStatement.java b/connectors/jdbc/src/main/java/edgent/connectors/jdbc/runtime/JdbcStatement.java
deleted file mode 100644
index 7d7ec9e..0000000
--- a/connectors/jdbc/src/main/java/edgent/connectors/jdbc/runtime/JdbcStatement.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.jdbc.runtime;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.slf4j.Logger;
-
-import edgent.connectors.jdbc.ParameterSetter;
-import edgent.connectors.jdbc.ResultsHandler;
-import edgent.connectors.jdbc.StatementSupplier;
-import edgent.function.Consumer;
-import edgent.function.Function;
-
-public class JdbcStatement<T,R> implements Function<T,Iterable<R>>,Consumer<T>,AutoCloseable {
-    private static final long serialVersionUID = 1L;
-    private final Logger logger;
-    private final JdbcConnector connector;
-    private final StatementSupplier stmtSupplier;
-    private final ParameterSetter<T> paramSetter;
-    private final ResultsHandler<T,R> resultsHandler;
-    private PreparedStatement stmt;
-    private long nTuples;
-    private long nTuplesFailed;
-    
-    private void closeStmt() {
-        if (stmt != null) {
-            logger.trace("closing statement");
-            PreparedStatement tmp = stmt;
-            stmt = null;
-            try {
-                tmp.close();
-            }
-            catch (SQLException e) {
-                logger.error("close stmt failed", e);
-            }
-        }
-    }
-    
-    public JdbcStatement(JdbcConnector connector,
-            StatementSupplier stmtSupplier, ParameterSetter<T> paramSetter,
-                ResultsHandler<T,R> resultsHandler) {
-        this.logger = connector.getLogger();
-        this.connector = connector;
-        this.stmtSupplier = stmtSupplier;
-        this.paramSetter = paramSetter;
-        this.resultsHandler = resultsHandler;
-    }
-    
-    public JdbcStatement(JdbcConnector connector,
-            StatementSupplier stmtSupplier, ParameterSetter<T> paramSetter) {
-        this(connector, stmtSupplier, paramSetter, null);
-    }
-
-    @Override
-    public void accept(T tuple) {
-        executeStatement(tuple, null);
-    }
-
-    @Override
-    public Iterable<R> apply(T tuple) {
-        // lame impl for large result sets but will do for now.
-        List<R> results = new ArrayList<>();
-        executeStatement(tuple, results);
-        return results;
-    }
-    
-    private void executeStatement(T tuple, List<R> results) {
-        nTuples++;
-        try {
-            logger.debug("executing statement nTuples={} nTuplesFailed={}", nTuples, nTuplesFailed);
-            Connection cn = connector.getConnection(this);
-            PreparedStatement stmt = getPreparedStatement(cn);
-            paramSetter.setParameters(tuple, stmt);
-            boolean hasResult = stmt.execute();
-            if (resultsHandler != null) {
-                if (!hasResult) {
-                    resultsHandler.handleResults(tuple, null/*rs*/, null/*exc*/,
-                            (result) -> results.add(result));
-                }
-                else {
-                    do {
-                        try (ResultSet rs = stmt.getResultSet()) {
-                            resultsHandler.handleResults(tuple, rs, null/*exc*/,
-                                    (result) -> results.add(result));
-                        }
-                    } while (stmt.getMoreResults());
-                }
-            }
-        }
-        catch (Exception e) {
-            nTuplesFailed++;
-            logger.trace("executing statement failed nTuples={} nTuplesFailed={}", nTuples, nTuplesFailed);
-            if (resultsHandler != null) {
-                try {
-                    resultsHandler.handleResults(tuple, null/*rs*/, e, 
-                            (result) -> results.add(result));
-                }
-                catch (Exception e2) {
-                    logger.error("failure result handler failed", e2);
-                }
-            }
-            closeStmt();
-            connector.statementFailed(this, e);
-        }
-    }
-    
-    private PreparedStatement getPreparedStatement(Connection cn) throws SQLException {
-        if (stmt == null) {
-            stmt = stmtSupplier.get(cn);
-        }
-        return stmt;
-    }
-
-    @Override
-    public void close() throws Exception {
-        closeStmt();
-        connector.unregister(this);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/CheckedFunction.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/CheckedFunction.java b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/CheckedFunction.java
new file mode 100644
index 0000000..93ce314
--- /dev/null
+++ b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/CheckedFunction.java
@@ -0,0 +1,36 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.jdbc;
+
+/**
+ * Function to apply a funtion to an input value and return a result.
+ *
+ * @param <T> input stream tuple type
+ * @param <R> result stream tuple type
+ */
+@FunctionalInterface
+public interface CheckedFunction<T,R> {
+    /**
+     * Apply a function to {@code t} and return the result.
+     * @param t input value
+     * @return the function result.
+     * @throws Exception if there are processing errors.
+     */
+    R apply(T t) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/CheckedSupplier.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/CheckedSupplier.java b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/CheckedSupplier.java
new file mode 100644
index 0000000..7924c9e
--- /dev/null
+++ b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/CheckedSupplier.java
@@ -0,0 +1,34 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.jdbc;
+
+/**
+ * Function that supplies a result and may throw an Exception.
+ *
+ * @param <T> stream tuple type
+ */
+@FunctionalInterface
+public interface CheckedSupplier<T> {
+    /**
+     * Get a result.
+     * @return the result
+     * @throws Exception if there are errors
+     */
+    T get() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/JdbcStreams.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/JdbcStreams.java b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/JdbcStreams.java
new file mode 100644
index 0000000..aad6316
--- /dev/null
+++ b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/JdbcStreams.java
@@ -0,0 +1,294 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.jdbc;
+
+import java.sql.Connection;
+
+import javax.sql.DataSource;
+
+import org.apache.edgent.connectors.jdbc.runtime.JdbcConnector;
+import org.apache.edgent.connectors.jdbc.runtime.JdbcStatement;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.topology.TSink;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+/**
+ * {@code JdbcStreams} is a streams connector to a database via the
+ * JDBC API {@code java.sql} package.
+ * <p>
+ * The connector provides general SQL access to a database, enabling
+ * writing of a stream's tuples to a database, creating a stream from
+ * database query results, and other operations.  
+ * Knowledge of the JDBC API is required.
+ * <p>
+ * Use of the connector involves:
+ * <ul>
+ * <li>constructing a streams connector to a database by providing it with:
+ *     <ul>
+ *     <li>a JDBC {@link javax.sql.DataSource}</li>
+ *     <li>a function that creates a JDBC {@link java.sql.Connection} 
+ *         from the {@code DataSource}</li>
+ *     </ul>
+ *     </li>
+ * <li>defining SQL statement executions and results handling by calling one
+ *     of the {@code executeStatement()} methods:
+ *     <ul>
+ *     <li>specify an SQL statement String or define a {@link StatementSupplier}.
+ *         A {@code StatementSupplier} 
+ *         creates a JDBC {@link java.sql.PreparedStatement} for an SQL statement
+ *         (e.g., a query, insert, update, etc operation).</li>
+ *     <li>define a {@link ParameterSetter}.  A {@code ParameterSetter}
+ *         sets the parameter values in a generic {@code PreparedStatement}.</li>
+ *     <li>define a {@link ResultsHandler} as required.
+ *         A {@code ResultsHandler} processes a JDBC
+ *         {@link java.sql.ResultSet} created by executing a SQL statement,
+ *         optionally creating one or more tuples from the results
+ *         and adding them to a stream.</li>
+ *     </ul>
+ *     </li>
+ * </ul>
+ * <p>
+ * Sample use:
+ * <pre>{@code
+ *  // construct a connector to the database
+ *  JdbcStreams mydb = new JdbcStreams(
+ *                // fn to create the javax.sql.DataSource to the db
+ *                () -> {
+ *                       Context ctx = new javax.naming.InitialContext();
+ *                       return (DataSource) ctx.lookup("jdbc/myDb");
+ *                      },
+ *                // fn to connect to the db (via the DataSource)
+ *                (dataSource,cn) ->  dataSource.getConnection(username,pw)
+ *              );
+ *
+ *  // ----------------------------------------------------
+ *  //
+ *  // Write a Person stream to a table
+ *  //                       
+ *  TStream<Person> persons = ...
+ *  TSink sink = mydb.executeStatement(persons,
+ *           () -> "INSERT INTO persons VALUES(?,?,?)",
+ *           (person,stmt) -> {
+ *               stmt.setInt(1, person.getId());
+ *               stmt.setString(2, person.getFirstName());
+ *               stmt.setString(3, person.getLastName());
+ *               }, 
+ *           );
+ *           
+ *  // ----------------------------------------------------
+ *  //
+ *  // Create a stream of Person from a PersonId tuple
+ *  //
+ *  TStream<PersonId> personIds = ...
+ *  TStream<Person> persons = mydb.executeStatement(personIds,
+ *            () -> "SELECT id, firstname, lastname FROM persons WHERE id = ?",
+ *            (personId,stmt) -> stmt.setInt(1,personId.getId()),
+ *            (personId,rs,exc,consumer) -> {
+ *                    if (exc != null) {
+ *                        // statement failed, do something
+ *                        int ecode = exc.getErrorCode();
+ *                        String state = exc.getSQLState();
+ *                        ...  // consumer.accept(...) if desired.
+ *                    }
+ *                    else {
+ *                        rs.next();
+ *                        int id = resultSet.getInt("id");
+ *                        String firstName = resultSet.getString("firstname");
+ *                        String lastName = resultSet.getString("lastname");
+ *                        consumer.accept(new Person(id, firstName, lastName));
+ *                    }
+ *                }
+ *            ); 
+ *  persons.print();
+ *    
+ *  // ----------------------------------------------------
+ *  //
+ *  // Delete all the rows from a table
+ *  //
+ *  TStream<String> beacon = topology.strings("once");
+ *  mydb.executeStatement(beacon,
+ *               () -> "DELETE FROM persons",
+ *               (tuple,stmt) -> { }  // no params to set
+ *              ); 
+ * }</pre>
+ */
+public class JdbcStreams {
+    @SuppressWarnings("unused")
+    private final Topology top;
+    private final JdbcConnector connector;
+    
+    /**
+     * Create a connector that uses a JDBC {@link DataSource} object to get
+     * a database connection.
+     * <p>
+     * In some environments it's common for JDBC DataSource objects to
+     * have been registered in JNDI. In such cases the dataSourceFn can be:
+     * <pre>{@code
+     * () -> {  Context ctx = new javax.naming.InitialContext();
+     *          return (DataSource) ctx.lookup("jdbc/" + logicalDbName);
+     *       }
+     * }</pre>
+     * <p>
+     * Alternatively, a DataSource can be created using a dbms implementation's
+     * DataSource class.
+     * For example:
+     * <pre>{@code
+     * () -> { EmbeddedDataSource ds = new org.apache.derby.jdbc.EmbeddedDataSource();
+     *         ds.setDatabaseName(dbName);
+     *         ds.setCreateDatabase("create");
+     *         return ds;
+     *       }
+     * }</pre>
+     * <p>
+     * Once {@code dataSourceFn} returns a DataSource it will not be called again. 
+     * <p>
+     * {@code connFn} is called only if a new JDBC connection is needed.
+     * It is not called per-processed-tuple.  JDBC failures in
+     * {@code executeStatement()} can result in a JDBC connection getting
+     * closed and {@code connFn} is subsequently called to reconnect.
+     * 
+     * @param topology topology that this connector is for
+     * @param dataSourceFn function that yields the {@link DataSource}
+     *              for the database.
+     * @param connFn function that yields a {@link Connection} from a {@code DataSource}.
+     */
+    public JdbcStreams(Topology topology, CheckedSupplier<DataSource> dataSourceFn, CheckedFunction<DataSource,Connection> connFn) {
+        this.top = topology;
+        this.connector = new JdbcConnector(dataSourceFn, connFn);
+    }
+
+    /**
+     * For each tuple on {@code stream} execute an SQL statement and
+     * add 0 or more resulting tuples to a result stream.
+     * <p>
+     * Same as using {@link #executeStatement(TStream, StatementSupplier, ParameterSetter, ResultsHandler)}
+     * specifying {@code dataSource -> dataSource.prepareStatement(stmtSupplier.get()}}
+     * for the {@code StatementSupplier}.
+     * 
+     * @param <T> Tuple type for input stream
+     * @param <R> Tuple type of result stream
+     * @param stream tuples to execute a SQL statement on behalf of
+     * @param stmtSupplier an SQL statement
+     * @param paramSetter function to set SQL statement parameters
+     * @param resultsHandler SQL ResultSet handler
+     * @return result Stream
+     */
+    public <T,R> TStream<R> executeStatement(TStream<T> stream,
+            Supplier<String> stmtSupplier,
+            ParameterSetter<T> paramSetter,
+            ResultsHandler<T,R> resultsHandler
+            ) {
+        return stream.flatMap(new JdbcStatement<T,R>(connector,
+                cn -> cn.prepareStatement(stmtSupplier.get()),
+                paramSetter, resultsHandler));
+    }
+    
+    /**
+     * For each tuple on {@code stream} execute an SQL statement and
+     * add 0 or more resulting tuples to a result stream.
+     * <p>
+     * Use to transform T tuples to R tuples, or
+     * enrich/update T tuples with additional information from a database.
+     * It can also be used to load a table into stream, 
+     * using a T to trigger that.
+     * Or to execute non-ResultSet generating
+     * SQL statements and receive failure info and/or generate tuple(s)
+     * upon completion.
+     * <p>
+     * {@code stmtSupplier} is called only once per new JDBC connection/reconnect.
+     * It is not called per-tuple.  Hence, with the exception of statement
+     * parameters, the returned statement is expected to be unchanging.
+     * Failures executing a statement can result in the connection getting
+     * closed and subsequently reconnected, resulting in another
+     * {@code stmtSupplier} call.
+     * <p>
+     * {@code resultsHandler} is called for every tuple.
+     * If {@code resultsHandler} throws an Exception, it is called a
+     * second time for the tuple with a non-null exception argument.
+     * 
+     * @param <T> Tuple type for input stream
+     * @param <R> Tuple type of result stream
+     * @param stream tuples to execute a SQL statement on behalf of
+     * @param stmtSupplier an SQL statement
+     * @param paramSetter function to set SQL statement parameters
+     * @param resultsHandler SQL ResultSet handler
+     * @return result Stream
+     * @see #executeStatement(TStream, Supplier, ParameterSetter, ResultsHandler)
+     */
+    public <T,R> TStream<R> executeStatement(TStream<T> stream,
+            StatementSupplier stmtSupplier,
+            ParameterSetter<T> paramSetter,
+            ResultsHandler<T,R> resultsHandler
+            ) {
+        return stream.flatMap(new JdbcStatement<T,R>(connector,
+                stmtSupplier, paramSetter, resultsHandler));
+    }
+    
+    /**
+     * For each tuple on {@code stream} execute an SQL statement.
+     * <p>
+     * Same as using {@link #executeStatement(TStream, StatementSupplier, ParameterSetter)}
+     * specifying {@code dataSource -> dataSource.prepareStatement(stmtSupplier.get()}}
+     * for the {@code StatementSupplier}.
+     *
+     * @param <T> Tuple type
+     * @param stream tuples to execute a SQL statement on behalf of
+     * @param stmtSupplier an SQL statement
+     * @param paramSetter function to set SQL statement parameters
+     * @return TSink sink element representing termination of this stream.
+     */
+    public <T> TSink<T> executeStatement(TStream<T> stream,
+            Supplier<String> stmtSupplier,
+            ParameterSetter<T> paramSetter
+            ) {
+        return stream.sink(new JdbcStatement<T,Object>(connector,
+                cn -> cn.prepareStatement(stmtSupplier.get()),
+                paramSetter));
+    }
+
+    /**
+     * For each tuple on {@code stream} execute an SQL statement.
+     * <p>
+     * Use to write a stream of T to a table.
+     * More generally, use a T as a trigger to execute some SQL statement
+     * that doesn't yield a ResultSet.
+     * <p>
+     * Use a non-sink form of {@code executeStatement()} (forms
+     * that take a {@code ResultsHandler}), if you want to:
+     * <ul>
+     * <li>be notified of statement execution failures</li>
+     * <li>generate tuple(s) after the statement has run.</li>
+     * </ul>
+     *
+     * @param <T> Tuple type
+     * @param stream tuples to execute a SQL statement on behalf of
+     * @param stmtSupplier an SQL statement
+     * @param paramSetter function to set SQL statement parameters
+     * @return TSink sink element representing termination of this stream.
+     * @see #executeStatement(TStream, Supplier, ParameterSetter)
+     */
+    public <T> TSink<T> executeStatement(TStream<T> stream,
+            StatementSupplier stmtSupplier,
+            ParameterSetter<T> paramSetter
+            ) {
+        return stream.sink(new JdbcStatement<T,Object>(connector,
+                stmtSupplier, paramSetter));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/ParameterSetter.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/ParameterSetter.java b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/ParameterSetter.java
new file mode 100644
index 0000000..e38ab11
--- /dev/null
+++ b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/ParameterSetter.java
@@ -0,0 +1,46 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.jdbc;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * Function that sets parameters in a JDBC SQL {@link java.sql.PreparedStatement}.
+ *
+ * @param <T> stream tuple type
+ */
+@FunctionalInterface
+public interface ParameterSetter<T> {
+    /**
+     * Set 0 or more parameters in a JDBC PreparedStatement. 
+     * <p>
+     * Sample use for a PreparedStatement of:
+     * <br>
+     * {@code "SELECT id, firstname, lastname FROM persons WHERE id = ?"}
+     * <pre>{@code
+     * ParameterSetter<PersonId> ps = (personId,stmt) -> stmt.setInt(1, personId.getId());
+     * }</pre>
+     *
+     * @param t stream tuple of type T
+     * @param stmt PreparedStatement
+     * @throws SQLException on failure
+     */
+    void setParameters(T t, PreparedStatement stmt) throws SQLException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/ResultsHandler.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/ResultsHandler.java b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/ResultsHandler.java
new file mode 100644
index 0000000..668b6ca
--- /dev/null
+++ b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/ResultsHandler.java
@@ -0,0 +1,66 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.jdbc;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.edgent.function.Consumer;
+
+/**
+ * Handle the results of executing an SQL statement.
+ * <p>
+ * Sample use:
+ * <br>
+ * For a ResultSet created by executing the SQL statement:
+ * <br>
+ * {@code "SELECT id, firstname, lastname FROM persons WHERE id = ?"}
+ * <pre>{@code 
+ * // create a Person tuple from db person info and add it to a stream
+ * ResultsHandler<PersonId,Person> rh = 
+ *     (tuple,rs,exc,consumer) -> {
+ *         if (exc != null)
+ *             return;
+ *         rs.next();
+ *         int id = rs.getInt("id");
+ *         String firstName = rs.getString("firstname");
+ *         String lastName = rs.getString("lastname");
+ *         consumer.accept(new Person(id, firstName, lastName));
+ *         }
+ *      }
+ *    };
+ * }</pre>
+ *
+ * @param <T> type of the tuple inducing the SQL statement execution / results
+ * @param <R> type of tuple of a result stream consumer
+ */
+@FunctionalInterface
+public interface ResultsHandler<T,R> {
+    /**
+     * Process the {@code ResultSet} and add 0 or more tuples to {@code consumer}.
+     * @param tuple the tuple that induced the resultSet
+     * @param resultSet the SQL statement's result set. null if {@code exc}
+     *        is non-null or if the statement doesn't generate a {@code ResultSet}.
+     * @param exc non-null if there was an exception executing the statement.
+     *        Typically a SQLException.
+     * @param consumer a Consumer to a result stream.
+     * @throws SQLException if there are problems handling the result
+     */
+    public void handleResults(T tuple, ResultSet resultSet, Exception exc, Consumer<R> consumer) throws SQLException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/StatementSupplier.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/StatementSupplier.java b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/StatementSupplier.java
new file mode 100644
index 0000000..aa2137a
--- /dev/null
+++ b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/StatementSupplier.java
@@ -0,0 +1,44 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.jdbc;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * Function that supplies a JDBC SQL {@link java.sql.PreparedStatement}.
+ */
+@FunctionalInterface
+public interface StatementSupplier {
+    /**
+     * Create a JDBC SQL PreparedStatement containing 0 or more parameters.
+     * <p>
+     * Sample use:
+     * <pre>{@code
+     * StatementSupplier ss = 
+     *     (cn) -> cn.prepareStatement("SELECT id, firstname, lastname"
+     *                                  + " FROM persons WHERE id = ?");
+     * }</pre>
+     * @param cn JDBC connection
+     * @return the PreparedStatement
+     * @throws SQLException on failure
+     */
+    PreparedStatement get(Connection cn) throws SQLException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/package-info.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/package-info.java b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/package-info.java
new file mode 100644
index 0000000..16fd25d
--- /dev/null
+++ b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/package-info.java
@@ -0,0 +1,25 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+/**
+ * JDBC based database stream connector.
+ * <p>
+ * Stream tuples may be written to databases
+ * and created or enriched by reading from databases.
+ */
+package org.apache.edgent.connectors.jdbc;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/runtime/JdbcConnector.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/runtime/JdbcConnector.java b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/runtime/JdbcConnector.java
new file mode 100644
index 0000000..93c8287
--- /dev/null
+++ b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/runtime/JdbcConnector.java
@@ -0,0 +1,101 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.jdbc.runtime;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.SQLTransientException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.sql.DataSource;
+
+import org.apache.edgent.connectors.jdbc.CheckedFunction;
+import org.apache.edgent.connectors.jdbc.CheckedSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JdbcConnector {
+    
+    private static final Logger logger = LoggerFactory.getLogger(JdbcConnector.class);
+    private final CheckedSupplier<DataSource> dataSourceFn;
+    private final CheckedFunction<DataSource,Connection> connFn;
+    private DataSource ds;
+    private final Map<JdbcStatement<?,?>,Connection> cnMap = new HashMap<>();
+    
+    public JdbcConnector(CheckedSupplier<DataSource> dataSourceFn, CheckedFunction<DataSource,Connection> connFn) {
+        this.dataSourceFn = dataSourceFn;
+        this.connFn = connFn;
+    }
+    
+    Logger getLogger() {
+        return logger;
+    }
+    
+    void unregister(JdbcStatement<?,?> oplet) {
+        logger.trace("unregistering statement");
+        closeCn(oplet);
+    }
+    
+    private DataSource getDataSource() throws Exception {
+        if (ds == null) {
+            logger.trace("getting DataSource");
+            ds = dataSourceFn.get();
+        }
+        return ds;
+    }
+    
+    synchronized Connection getConnection(JdbcStatement<?,?> oplet) throws Exception {
+        // Apparently a bad idea for multiple threads (operators
+        // in our case) to use a single Connection instance.
+        Connection cn = cnMap.get(oplet);
+        if (cn == null) {
+            try {
+                logger.trace("getting jdbc connection");
+                cn = connFn.apply(getDataSource());
+                cnMap.put(oplet, cn);
+            }
+            catch (Exception e) {
+                logger.error("unable to connect", e);
+                throw e;
+            }
+        }
+        return cn;
+    }
+
+    void statementFailed(JdbcStatement<?,?> oplet, Exception e) {
+        logger.error("statement failed", e);
+        if (!(e instanceof SQLTransientException)) {
+            closeCn(oplet);
+        }
+    }
+    
+    private synchronized void closeCn(JdbcStatement<?,?> oplet) {
+        try {
+            Connection cn = cnMap.remove(oplet);
+            if (cn != null) {
+                logger.trace("closing jdbc connection");
+                cn.close();
+            }
+        }
+        catch (SQLException e) {
+            logger.error("jdbc close cn failed", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/runtime/JdbcStatement.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/runtime/JdbcStatement.java b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/runtime/JdbcStatement.java
new file mode 100644
index 0000000..7c557ec
--- /dev/null
+++ b/connectors/jdbc/src/main/java/org/apache/edgent/connectors/jdbc/runtime/JdbcStatement.java
@@ -0,0 +1,141 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.jdbc.runtime;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.edgent.connectors.jdbc.ParameterSetter;
+import org.apache.edgent.connectors.jdbc.ResultsHandler;
+import org.apache.edgent.connectors.jdbc.StatementSupplier;
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Function;
+import org.slf4j.Logger;
+
+public class JdbcStatement<T,R> implements Function<T,Iterable<R>>,Consumer<T>,AutoCloseable {
+    private static final long serialVersionUID = 1L;
+    private final Logger logger;
+    private final JdbcConnector connector;
+    private final StatementSupplier stmtSupplier;
+    private final ParameterSetter<T> paramSetter;
+    private final ResultsHandler<T,R> resultsHandler;
+    private PreparedStatement stmt;
+    private long nTuples;
+    private long nTuplesFailed;
+    
+    private void closeStmt() {
+        if (stmt != null) {
+            logger.trace("closing statement");
+            PreparedStatement tmp = stmt;
+            stmt = null;
+            try {
+                tmp.close();
+            }
+            catch (SQLException e) {
+                logger.error("close stmt failed", e);
+            }
+        }
+    }
+    
+    public JdbcStatement(JdbcConnector connector,
+            StatementSupplier stmtSupplier, ParameterSetter<T> paramSetter,
+                ResultsHandler<T,R> resultsHandler) {
+        this.logger = connector.getLogger();
+        this.connector = connector;
+        this.stmtSupplier = stmtSupplier;
+        this.paramSetter = paramSetter;
+        this.resultsHandler = resultsHandler;
+    }
+    
+    public JdbcStatement(JdbcConnector connector,
+            StatementSupplier stmtSupplier, ParameterSetter<T> paramSetter) {
+        this(connector, stmtSupplier, paramSetter, null);
+    }
+
+    @Override
+    public void accept(T tuple) {
+        executeStatement(tuple, null);
+    }
+
+    @Override
+    public Iterable<R> apply(T tuple) {
+        // lame impl for large result sets but will do for now.
+        List<R> results = new ArrayList<>();
+        executeStatement(tuple, results);
+        return results;
+    }
+    
+    private void executeStatement(T tuple, List<R> results) {
+        nTuples++;
+        try {
+            logger.debug("executing statement nTuples={} nTuplesFailed={}", nTuples, nTuplesFailed);
+            Connection cn = connector.getConnection(this);
+            PreparedStatement stmt = getPreparedStatement(cn);
+            paramSetter.setParameters(tuple, stmt);
+            boolean hasResult = stmt.execute();
+            if (resultsHandler != null) {
+                if (!hasResult) {
+                    resultsHandler.handleResults(tuple, null/*rs*/, null/*exc*/,
+                            (result) -> results.add(result));
+                }
+                else {
+                    do {
+                        try (ResultSet rs = stmt.getResultSet()) {
+                            resultsHandler.handleResults(tuple, rs, null/*exc*/,
+                                    (result) -> results.add(result));
+                        }
+                    } while (stmt.getMoreResults());
+                }
+            }
+        }
+        catch (Exception e) {
+            nTuplesFailed++;
+            logger.trace("executing statement failed nTuples={} nTuplesFailed={}", nTuples, nTuplesFailed);
+            if (resultsHandler != null) {
+                try {
+                    resultsHandler.handleResults(tuple, null/*rs*/, e, 
+                            (result) -> results.add(result));
+                }
+                catch (Exception e2) {
+                    logger.error("failure result handler failed", e2);
+                }
+            }
+            closeStmt();
+            connector.statementFailed(this, e);
+        }
+    }
+    
+    private PreparedStatement getPreparedStatement(Connection cn) throws SQLException {
+        if (stmt == null) {
+            stmt = stmtSupplier.get(cn);
+        }
+        return stmt;
+    }
+
+    @Override
+    public void close() throws Exception {
+        closeStmt();
+        connector.unregister(this);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/test/java/edgent/test/connectors/jdbc/JdbcStreamsGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/test/java/edgent/test/connectors/jdbc/JdbcStreamsGlobalTest.java b/connectors/jdbc/src/test/java/edgent/test/connectors/jdbc/JdbcStreamsGlobalTest.java
deleted file mode 100644
index ba2dfa7..0000000
--- a/connectors/jdbc/src/test/java/edgent/test/connectors/jdbc/JdbcStreamsGlobalTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.connectors.jdbc;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * JdbcStreams connector globalization tests.
- * <p>
- * The tests use Apache Embedded Derby as the backing dbms.
- * The Oracle JDK includes Derby in $JAVA_HOME/db.
- * Manually install Derby for other JDKs if required.
- * Arrange for the classpath to be configured by one of:
- * <ul>
- * <li>set the DERBY_HOME environment variable.  build.xml adds
- *     $DERBY_HOME/lib/derby.jar to the classpath when running the tests.</li>
- * <li>manually add derby.jar to the classpath</li>
- * </ul>
- * The tests are "skipped" if the dbms's jdbc driver can't be found.
- */
-public class JdbcStreamsGlobalTest extends JdbcStreamsTest {
-
-    private static final List<Person> globalPersonList = new ArrayList<>();
-    static {
-        globalPersonList.add(new Person(1, "\u7ea6\u7ff0", "\u674e", "male", 35));
-        globalPersonList.add(new Person(2, "\u7b80", "\u674e", "female", 29));
-        globalPersonList.add(new Person(3, "\u6bd4\u5229", "\u5468", "male", 3));
-    }
-    private static final List<PersonId> globalPersonIdList = new ArrayList<>();
-    static {
-        for(Person p : globalPersonList) {
-            globalPersonIdList.add(new PersonId(p.id));
-        }
-    }
-
-    public List<Person> getPersonList() {
-        return globalPersonList;
-    }
-
-    public List<PersonId> getPersonIdList() {
-        return globalPersonIdList;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/test/java/edgent/test/connectors/jdbc/JdbcStreamsTest.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/test/java/edgent/test/connectors/jdbc/JdbcStreamsTest.java b/connectors/jdbc/src/test/java/edgent/test/connectors/jdbc/JdbcStreamsTest.java
deleted file mode 100644
index b24c7e7..0000000
--- a/connectors/jdbc/src/test/java/edgent/test/connectors/jdbc/JdbcStreamsTest.java
+++ /dev/null
@@ -1,635 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.connectors.jdbc;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assume.assumeTrue;
-
-import java.lang.reflect.Method;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-
-import javax.sql.DataSource;
-
-import org.junit.Test;
-
-import edgent.connectors.jdbc.JdbcStreams;
-import edgent.test.connectors.common.ConnectorTestBase;
-import edgent.topology.TSink;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.plumbing.PlumbingStreams;
-
-/**
- * JdbcStreams connector tests.
- * <p>
- * The tests use Apache Embedded Derby as the backing dbms.
- * The Oracle JDK includes Derby in $JAVA_HOME/db.
- * Manually install Derby for other JDKs if required.
- * Arrange for the classpath to be configured by one of:
- * <ul>
- * <li>set the DERBY_HOME environment variable.  build.xml adds 
- *     $DERBY_HOME/lib/derby.jar to the classpath when running the tests.</li>
- * <li>manually add derby.jar to the classpath</li>
- * </ul>
- * The tests are "skipped" if the dbms's jdbc driver can't be found.
- */
-public class JdbcStreamsTest  extends ConnectorTestBase {
-    
-    private static final int SEC_TIMEOUT = 10;
-    private final static String DB_NAME = "JdbcStreamsTestDb";
-    private final static String USERNAME = System.getProperty("user.name");
-    private final static String PW = "none";
-    private static final List<Person> personList = new ArrayList<>();
-    static {
-        personList.add(new Person(1, "John", "Doe", "male", 35));
-        personList.add(new Person(2, "Jane", "Doe", "female", 29));
-        personList.add(new Person(3, "Billy", "McDoe", "male", 3));
-    }
-    private static final List<PersonId> personIdList = new ArrayList<>();
-    static {
-        for(Person p : personList) {
-            personIdList.add(new PersonId(p.id));
-        }
-    }
-    
-    static class Person {
-        int id;
-        String firstName;
-        String lastName;
-        String gender;
-        int age;
-        Person(int id, String first, String last, String gender, int age) {
-            this.id = id;
-            this.firstName = first;
-            this.lastName = last;
-            this.gender = gender;
-            this.age = age;
-        }
-        public String toString() {
-            return String.format("id=%d first=%s last=%s gender=%s age=%d",
-                    id, firstName, lastName, gender, age);
-        }
-    }
-    
-    static class PersonId {
-        int id;
-        PersonId(int id) {
-            this.id = id;
-        }
-        public String toString() {
-            return String.format("id=%d", id);
-        }
-    }
-
-    public List<Person> getPersonList() {
-        return personList;
-    }
-
-    public List<PersonId> getPersonIdList() {
-        return personIdList;
-    }
-
-    DataSource getDerbyEmbeddedDataSource(String database) throws Exception
-    {
-        // Avoid a compile-time dependency to the jdbc driver.
-        // At runtime, require that the classpath can find it.
-        // e.g., build.xml adds $DERBY_HOME/lib/derby.jar to the test classpath
-
-        String DERBY_DATA_SOURCE = "org.apache.derby.jdbc.EmbeddedDataSource";
-    
-        Class<?> nsDataSource = null;
-        try {
-            nsDataSource = Class.forName(DERBY_DATA_SOURCE);
-        }
-        catch (ClassNotFoundException e) {
-            String msg = "Fix the test classpath. ";
-            if (System.getenv("DERBY_HOME") == null) {
-                msg += "DERBY_HOME not set. ";
-            }
-            msg += "Class not found: "+e.getLocalizedMessage();
-            System.err.println(msg);
-            assumeTrue(false);
-        }
-        DataSource ds = (DataSource) nsDataSource.newInstance();
-
-        @SuppressWarnings("rawtypes")
-        Class[] methodParams = new Class[] {String.class};
-        Method dbname = nsDataSource.getMethod("setDatabaseName", methodParams);
-        Object[] args = new Object[] {database};
-        dbname.invoke(ds, args);
-
-        // create the db if necessary
-        Method create = nsDataSource.getMethod("setCreateDatabase", methodParams);
-        args = new Object[] {"create"};
-        create.invoke(ds, args);
-    
-        return ds;
-    }
-    
-    private DataSource getDataSource(String logicalDbName) throws Exception {
-        return getDerbyEmbeddedDataSource(logicalDbName);
-    }
-    
-    private Connection connect(DataSource ds) throws Exception {
-        return ds.getConnection(USERNAME, PW);
-    }
-    
-    private void createPersonsTable() throws Exception {
-        DataSource ds = getDataSource(DB_NAME);
-        try(Connection cn = connect(ds)) {
-            Statement stmt = cn.createStatement();
-            try {
-                stmt.execute("CREATE TABLE persons "
-                        + "("
-                        + "id INTEGER NOT NULL,"
-                        + "firstname VARCHAR(40) NOT NULL,"
-                        + "lastname VARCHAR(40) NOT NULL,"
-                        + "gender VARCHAR(6),"
-                        + "age INTEGER,"
-                        + "PRIMARY KEY (id)"
-                        + ")"
-                        );
-            }
-            catch (SQLException e) {
-                if (e.getLocalizedMessage().contains("already exists"))
-                    return;
-                else
-                    throw e;
-            }
-        }
-    }
-    
-    private void truncatePersonsTable() throws Exception {
-        createPersonsTable();
-        DataSource ds = getDataSource(DB_NAME);
-        try(Connection cn = connect(ds)) {
-            Statement stmt = cn.createStatement();
-            stmt.executeUpdate("DELETE FROM persons");
-        }
-    }
-    
-    private void populatePersonsTable(List<Person> personList) throws Exception {
-        truncatePersonsTable();
-        DataSource ds = getDataSource(DB_NAME);
-        try(Connection cn = connect(ds)) {
-            Statement stmt = cn.createStatement();
-            for(Person p : personList) {
-                stmt.execute(String.format(
-                        "INSERT INTO persons VALUES(%d,'%s','%s','%s',%d)",
-                            p.id, p.firstName, p.lastName, p.gender, p.age));
-            }
-        }
-    }
-
-    private TStream<Person> readPersonsTable(Topology t, JdbcStreams db, List<PersonId> personIdList, int delayMsec) {
-        // Create a stream of Person from a stream of ids
-        TStream<PersonId> personIds = t.collection(personIdList);
-        if (delayMsec!=0) {
-            personIds =  PlumbingStreams.blockingOneShotDelay(personIds,
-                    delayMsec, TimeUnit.MILLISECONDS);
-        }
-        TStream<Person> rcvdPerson = db.executeStatement(personIds,
-                () -> "SELECT id, firstname, lastname, gender, age"
-                        + " FROM persons WHERE id = ?",
-                (tuple,stmt) -> stmt.setInt(1, tuple.id),
-                (tuple,resultSet,exc,stream) -> {
-                    resultSet.next();
-                    int id = resultSet.getInt("id");
-                    String firstName = resultSet.getString("firstname");
-                    String lastName = resultSet.getString("lastname");
-                    String gender = resultSet.getString("gender");
-                    int age = resultSet.getInt("age");
-                    stream.accept(new Person(id, firstName, lastName, gender, age));
-                    }
-                );
-        return rcvdPerson;
-    }
-    
-    private static java.util.function.Predicate<Person> newOddIdPredicate() {
-        return (person) -> person.id % 2 != 0;
-    }
-    
-    private List<String> expectedPersons(java.util.function.Predicate<Person> predicate, List<Person> persons) {
-        return persons.stream()
-                .filter(predicate)
-                .map(person -> person.toString())
-                .collect(Collectors.toList());
-    }
-
-    @Test
-    public void testBasicRead() throws Exception {
-        Topology t = this.newTopology("testBasicRead");
-        
-        populatePersonsTable(getPersonList());
-        List<String> expected = expectedPersons(person->true, getPersonList());
-
-        JdbcStreams db = new JdbcStreams(t,
-                () -> getDataSource(DB_NAME),
-                dataSource -> connect(dataSource));
-
-        // Create a stream of Person from a stream of ids
-        TStream<Person> rcvdPerson = readPersonsTable(t, db, getPersonIdList(), 0/*msec*/);
-        TStream<String> rcvd = rcvdPerson.map(person -> person.toString());
-        
-        rcvd.sink(tuple -> System.out.println(
-                String.format("%s rcvd: %s", t.getName(), tuple)));
-        completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
-    }
-
-    @Test
-    public void testBasicRead2() throws Exception {
-        Topology t = newTopology("testBasicRead2");
-        // same as testBasic but use the explicit PreparedStatement forms
-        // of executeStatement().
-        
-        populatePersonsTable(getPersonList());
-        List<String> expected = expectedPersons(person->true, getPersonList());
-
-        JdbcStreams db = new JdbcStreams(t,
-                () -> getDataSource(DB_NAME),
-                dataSource -> connect(dataSource));
-
-        // Create a stream of Person from a stream of ids
-        // Delay so this runs after populating the db above
-        TStream<PersonId> personIds =  PlumbingStreams.blockingOneShotDelay(
-                t.collection(getPersonIdList()), 3, TimeUnit.SECONDS);
-        TStream<Person> rcvdPerson = db.executeStatement(personIds,
-                (cn) -> cn.prepareStatement("SELECT id, firstname, lastname, gender, age"
-                        + " FROM persons WHERE id = ?"),
-                (tuple,stmt) -> stmt.setInt(1, tuple.id),
-                (tuple,resultSet,exc,stream) -> {
-                    resultSet.next();
-                    int id = resultSet.getInt("id");
-                    String firstName = resultSet.getString("firstname");
-                    String lastName = resultSet.getString("lastname");
-                    String gender = resultSet.getString("gender");
-                    int age = resultSet.getInt("age");
-                    stream.accept(new Person(id, firstName, lastName, gender, age));
-                    }
-                );
-        TStream<String> rcvd = rcvdPerson.map(person -> person.toString());
-        
-        rcvd.sink(tuple -> System.out.println(
-                String.format("%s rcvd: %s", t.getName(), tuple)));
-        completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
-    }
-    
-    @Test
-    public void testBasicWrite() throws Exception {
-        Topology t = newTopology("testBasicWrite");
-        
-        truncatePersonsTable();
-        List<String> expected = expectedPersons(person->true, getPersonList());
-
-        JdbcStreams db = new JdbcStreams(t,
-                () -> getDataSource(DB_NAME),
-                dataSource -> connect(dataSource));
-        
-        // Add stream of Person to the db
-        TStream<Person> s = t.collection(getPersonList());
-        TSink<Person> sink = db.executeStatement(s,
-                () -> "INSERT INTO persons VALUES(?,?,?,?,?)",
-                (tuple,stmt) -> {
-                    stmt.setInt(1, tuple.id);
-                    stmt.setString(2, tuple.firstName);
-                    stmt.setString(3, tuple.lastName);
-                    stmt.setString(4, tuple.gender);
-                    stmt.setInt(5, tuple.age);
-                    }
-                );
-        assertNotNull(sink);
-        
-        // Use the same code as testBasicRead to verify the write worked.
-        TStream<Person> rcvdPerson = readPersonsTable(t, db, getPersonIdList(), 3000/*msec*/);
-        TStream<String> rcvd = rcvdPerson.map(person -> person.toString());
-        
-        rcvd.sink(tuple -> System.out.println(
-                String.format("%s rcvd: %s", t.getName(), tuple)));
-        completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
-    }
-    
-    @Test
-    public void testBasicWrite2() throws Exception {
-        Topology t = newTopology("testBasicWrite2");
-        // same as testBasic but use the explicit PreparedStatement forms
-        // of executeStatement().
-        
-        truncatePersonsTable();
-        List<String> expected = expectedPersons(person->true, getPersonList());
-
-        JdbcStreams db = new JdbcStreams(t,
-                () -> getDataSource(DB_NAME),
-                dataSource -> connect(dataSource));
-        
-        // Add stream of Person to the db
-        TStream<Person> s = t.collection(getPersonList());
-        TSink<Person> sink = db.executeStatement(s,
-                (cn) -> cn.prepareStatement("INSERT into PERSONS values(?,?,?,?,?)"),
-                (tuple,stmt) -> {
-                    stmt.setInt(1, tuple.id);
-                    stmt.setString(2, tuple.firstName);
-                    stmt.setString(3, tuple.lastName);
-                    stmt.setString(4, tuple.gender);
-                    stmt.setInt(5, tuple.age);
-                    }
-                );
-
-        assertNotNull(sink);
-        
-        // Use the same code as testBasicRead to verify the write worked.
-        TStream<Person> rcvdPerson = readPersonsTable(t, db, getPersonIdList(), 3000/*msec*/);
-        TStream<String> rcvd = rcvdPerson.map(person -> person.toString());
-        
-        rcvd.sink(tuple -> System.out.println(
-                String.format("%s rcvd: %s", t.getName(), tuple)));
-        completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
-    }
-    
-    @Test
-    public void testBadConnectFn() throws Exception {
-        Topology t = newTopology("testBadConnectFn");
-        // connFn is only called for initial connect or reconnect
-        // following certain failures.
-        // Hence, to exercise transient connFn failures, we need to start
-        // off with a failure.
-        
-        // TODO for transient connection failure cases, simulate a failure
-        // as part of preparedStatement.execute() failing (e.g., force cn-close
-        // right before it?), so that we can verify the conn is closed and
-        // then reconnected
-        
-        populatePersonsTable(getPersonList());
-        List<String> expected = expectedPersons(p->true, getPersonList().subList(1, getPersonList().size()));
-        int expectedExcCnt = getPersonList().size() - expected.size();
-
-        AtomicInteger connFnCnt = new AtomicInteger();
-        JdbcStreams db = new JdbcStreams(t,
-                () -> getDataSource(DB_NAME),
-                dataSource -> {
-                    if (connFnCnt.incrementAndGet() == 1)
-                        throw new SQLException("FAKE-CONNECT-FN-FAILURE");
-                    else
-                        return connect(dataSource);
-                    });
-
-        // Create a stream of Person from a stream of ids
-        AtomicInteger executionExcCnt = new AtomicInteger();
-        TStream<PersonId> personIds = t.collection(getPersonIdList());
-        TStream<Person> rcvdPerson = db.executeStatement(personIds,
-                () -> "SELECT id, firstname, lastname, gender, age"
-                        + " FROM persons WHERE id = ?",
-                (tuple,stmt) -> stmt.setInt(1, tuple.id),
-                (tuple,resultSet,exc,stream) -> {
-                    System.out.println(t.getName()+" resultHandler called tuple="+tuple+" exc="+exc);
-                    if (exc!=null) {
-                        executionExcCnt.incrementAndGet();
-                        return;
-                    }
-                    resultSet.next();
-                    int id = resultSet.getInt("id");
-                    String firstName = resultSet.getString("firstname");
-                    String lastName = resultSet.getString("lastname");
-                    String gender = resultSet.getString("gender");
-                    int age = resultSet.getInt("age");
-                    stream.accept(new Person(id, firstName, lastName, gender, age));
-                    }
-                );
-        TStream<String> rcvd = rcvdPerson.map(person -> person.toString());
-        
-        rcvd.sink(tuple -> System.out.println(
-                String.format("%s rcvd: %s", t.getName(), tuple)));
-        completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
-        assertEquals("executionExcCnt", expectedExcCnt, executionExcCnt.get());
-    }
-    
-    @Test
-    public void testBadSQL() throws Exception {
-        Topology t = newTopology("testBadSQL");
-        // the statement is nominally "retrieved" only once, not per-tuple.
-        // hence, there's not much sense in trying to simulate it
-        // getting called unsuccessfully, then successfully, etc.
-        // however, verify the result handler gets called appropriately.
-        
-        populatePersonsTable(getPersonList());
-        List<String> expected = Collections.emptyList();
-        int expectedExcCnt = getPersonList().size() - expected.size();
-
-        JdbcStreams db = new JdbcStreams(t,
-                () -> getDataSource(DB_NAME),
-                dataSource -> connect(dataSource));
-
-        // Create a stream of Person from a stream of ids
-        AtomicInteger executionExcCnt = new AtomicInteger();
-        TStream<PersonId> personIds = t.collection(getPersonIdList());
-        TStream<Person> rcvdPerson = db.executeStatement(personIds,
-                () -> "SELECT id, firstname, lastname, gender, age"
-                        + " FROM persons WHERE BOGUS_XYZZY id = ?",
-                (tuple,stmt) -> stmt.setInt(1, tuple.id),
-                (tuple,resultSet,exc,stream) -> {
-                    System.out.println(t.getName()+" resultHandler called tuple="+tuple+" exc="+exc);
-                    if (exc!=null) {
-                        executionExcCnt.incrementAndGet();
-                        return;
-                    }
-                    resultSet.next();
-                    int id = resultSet.getInt("id");
-                    String firstName = resultSet.getString("firstname");
-                    String lastName = resultSet.getString("lastname");
-                    String gender = resultSet.getString("gender");
-                    int age = resultSet.getInt("age");
-                    stream.accept(new Person(id, firstName, lastName, gender, age));
-                    }
-                );
-        TStream<String> rcvd = rcvdPerson.map(person -> person.toString());
-        
-        rcvd.sink(tuple -> System.out.println(
-                String.format("%s rcvd: %s", t.getName(), tuple)));
-        completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
-        assertEquals("executionExcCnt", expectedExcCnt, executionExcCnt.get());
-    }
-    
-    @Test
-    public void testBadSetParams() throws Exception {
-        Topology t = newTopology("testBadSetParams");
-        // exercise and validate  behavior with transient parameter setter failures
-        
-        populatePersonsTable(getPersonList());
-        List<String> expected = expectedPersons(newOddIdPredicate(), getPersonList());
-        int expectedExcCnt = getPersonList().size() - expected.size();
-
-        JdbcStreams db = new JdbcStreams(t,
-                () -> getDataSource(DB_NAME),
-                dataSource -> connect(dataSource));
-
-        // Create a stream of Person from a stream of ids
-        AtomicInteger executionExcCnt = new AtomicInteger();
-        TStream<PersonId> personIds = t.collection(getPersonIdList());
-        TStream<Person> rcvdPerson = db.executeStatement(personIds,
-                () -> "SELECT id, firstname, lastname, gender, age"
-                        + " FROM persons WHERE id = ?",
-                (tuple,stmt) -> { if (tuple.id % 2 != 0)
-                                    stmt.setInt(1, tuple.id);
-                                  else
-                                    stmt.setString(1, "THIS-IS-BOGUS"); },
-                (tuple,resultSet,exc,stream) -> {
-                    System.out.println(t.getName()+" resultHandler called tuple="+tuple+" exc="+exc);
-                    if (exc!=null) {
-                        executionExcCnt.incrementAndGet();
-                        return;
-                    }
-                    resultSet.next();
-                    int id = resultSet.getInt("id");
-                    String firstName = resultSet.getString("firstname");
-                    String lastName = resultSet.getString("lastname");
-                    String gender = resultSet.getString("gender");
-                    int age = resultSet.getInt("age");
-                    stream.accept(new Person(id, firstName, lastName, gender, age));
-                    }
-                );
-        TStream<String> rcvd = rcvdPerson.map(person -> person.toString());
-        
-        rcvd.sink(tuple -> System.out.println(
-                String.format("%s rcvd: %s", t.getName(), tuple)));
-        completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
-        assertEquals("executionExcCnt", expectedExcCnt, executionExcCnt.get());
-    }
-    
-    @Test
-    public void testBadResultHandler() throws Exception {
-        Topology t = newTopology("testBadResultHandler");
-        // exercise and validate behavior with transient result handler failures
-        
-        populatePersonsTable(getPersonList());
-        List<String> expected = expectedPersons(newOddIdPredicate(), getPersonList());
-        int expectedExcCnt = getPersonList().size() - expected.size();
-
-        JdbcStreams db = new JdbcStreams(t,
-                () -> getDataSource(DB_NAME),
-                dataSource -> connect(dataSource));
-
-        // Create a stream of Person from a stream of ids
-        AtomicInteger executionExcCnt = new AtomicInteger();
-        TStream<PersonId> personIds = t.collection(getPersonIdList());
-        TStream<Person> rcvdPerson = db.executeStatement(personIds,
-                () -> "SELECT id, firstname, lastname, gender, age"
-                        + " FROM persons WHERE id = ?",
-                (tuple,stmt) -> stmt.setInt(1, tuple.id),
-                (tuple,resultSet,exc,stream) -> {
-                    System.out.println(t.getName()+" resultHandler called tuple="+tuple+" exc="+exc);
-                    if (exc!=null) {
-                        executionExcCnt.incrementAndGet();
-                        return;
-                    }
-                    resultSet.next();
-                    int id = resultSet.getInt(tuple.id % 2 == 0
-                                                ? "ID-THIS-IS-BOGUS" : "id");
-                    String firstName = resultSet.getString("firstname");
-                    String lastName = resultSet.getString("lastname");
-                    String gender = resultSet.getString("gender");
-                    int age = resultSet.getInt("age");
-                    stream.accept(new Person(id, firstName, lastName, gender, age));
-                    }
-                );
-        TStream<String> rcvd = rcvdPerson.map(person -> person.toString());
-        
-        rcvd.sink(tuple -> System.out.println(
-                String.format("%s rcvd: %s", t.getName(), tuple)));
-        completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
-        assertEquals("executionExcCnt", expectedExcCnt, executionExcCnt.get());
-    }
-    
-    
-    @Test
-    public void testNonResultSetStmt() throws Exception {
-        Topology t = newTopology("testNonResultSetStmt");
-        // exercise and validate use of non-ResultSet SQL statement
-        // wrt proper resultHandler behavior - e.g., receive exception,
-        // can generate tuples
-        
-        List<String> expected = Arrays.asList("once");
-
-        // throw if can't create DataSource - e.g., can't locate derby
-        getDataSource(DB_NAME);
-
-        JdbcStreams db = new JdbcStreams(t,
-                () -> getDataSource(DB_NAME),
-                dataSource -> connect(dataSource));
-        
-        // Add stream of Person to the db
-        TStream<String> trigger = t.collection(expected);
-        TStream<String> dropTableStep = db.executeStatement(trigger,
-                () -> "DROP TABLE swill",
-                (tuple,stmt) -> { /* no params */ },
-                (tuple,rs,exc,consumer) -> {
-                        // ok if fails
-                        System.out.println(t.getName()+" resultHandler drop table exc="+exc);
-                        if (rs!=null)
-                            throw new IllegalStateException("rs!=null");
-                        consumer.accept(tuple);
-                    }
-                );
-        TStream<String> createTableStep = db.executeStatement(dropTableStep,
-                () -> "CREATE TABLE swill (id INTEGER NOT NULL)",
-                (tuple,stmt) -> { /* no params */ },
-                (tuple,rs,exc,consumer) -> {
-                        System.out.println(t.getName()+" resultHandler create table exc="+exc);
-                        if (rs!=null)
-                            throw new IllegalStateException("rs!=null");
-                        consumer.accept(tuple);
-                    }
-                );
-        TStream<String> failDropTable = db.executeStatement(createTableStep,
-                () -> "DROP TABLE no_such_table",
-                (tuple,stmt) -> { /* no params */ },
-                (tuple,rs,exc,consumer) -> {
-                        System.out.println(t.getName()+" resultHandler fail drop table exc="+exc);
-                        if (exc==null)
-                            throw new IllegalStateException("exc==null");
-                        if (rs!=null)
-                            throw new IllegalStateException("rs!=null");
-                        consumer.accept(tuple);
-                    }
-                );
-        TStream<String> selectStep = db.executeStatement(failDropTable,
-                () -> "SELECT * FROM swill",
-                (tuple,stmt) -> { /* no params */ },
-                (tuple,rs,exc,consumer) -> {
-                        System.out.println(t.getName()+" resultHandler select exc="+exc);
-                        if (rs==null)
-                            throw new IllegalStateException("rs==null");
-                        consumer.accept(tuple);
-                    }
-                );
-        TStream<String> rcvd = selectStep;
-                
-        completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/jdbc/src/test/java/org/apache/edgent/test/connectors/jdbc/JdbcStreamsGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/test/java/org/apache/edgent/test/connectors/jdbc/JdbcStreamsGlobalTest.java b/connectors/jdbc/src/test/java/org/apache/edgent/test/connectors/jdbc/JdbcStreamsGlobalTest.java
new file mode 100644
index 0000000..4f1cacb
--- /dev/null
+++ b/connectors/jdbc/src/test/java/org/apache/edgent/test/connectors/jdbc/JdbcStreamsGlobalTest.java
@@ -0,0 +1,61 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.connectors.jdbc;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * JdbcStreams connector globalization tests.
+ * <p>
+ * The tests use Apache Embedded Derby as the backing dbms.
+ * The Oracle JDK includes Derby in $JAVA_HOME/db.
+ * Manually install Derby for other JDKs if required.
+ * Arrange for the classpath to be configured by one of:
+ * <ul>
+ * <li>set the DERBY_HOME environment variable.  build.xml adds
+ *     $DERBY_HOME/lib/derby.jar to the classpath when running the tests.</li>
+ * <li>manually add derby.jar to the classpath</li>
+ * </ul>
+ * The tests are "skipped" if the dbms's jdbc driver can't be found.
+ */
+public class JdbcStreamsGlobalTest extends JdbcStreamsTest {
+
+    private static final List<Person> globalPersonList = new ArrayList<>();
+    static {
+        globalPersonList.add(new Person(1, "\u7ea6\u7ff0", "\u674e", "male", 35));
+        globalPersonList.add(new Person(2, "\u7b80", "\u674e", "female", 29));
+        globalPersonList.add(new Person(3, "\u6bd4\u5229", "\u5468", "male", 3));
+    }
+    private static final List<PersonId> globalPersonIdList = new ArrayList<>();
+    static {
+        for(Person p : globalPersonList) {
+            globalPersonIdList.add(new PersonId(p.id));
+        }
+    }
+
+    public List<Person> getPersonList() {
+        return globalPersonList;
+    }
+
+    public List<PersonId> getPersonIdList() {
+        return globalPersonIdList;
+    }
+
+}