You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2022/09/01 17:19:07 UTC

[tika] branch main updated (8a45dac52 -> b8745d1e7)

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


    from 8a45dac52 TIKA-3845 -- add a CallablePipesIterator
     new 7e054f49d TIKA-3846 allow a strategy for attachments in the jdbc emitter
     new b8745d1e7 determine jdbcemitter is not yet thread safe; improve documentation and remove code that incorrectly hopes for thread safety.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../tika/pipes/emitter/jdbc/JDBCEmitter.java       | 162 +++++++++++++++------
 .../tika/pipes/emitter/jdbc/JDBCEmitterTest.java   |  48 +++++-
 ...ml => tika-config-jdbc-emitter-attachments.xml} |  10 +-
 .../tika-config-jdbc-emitter-existing-table.xml    |   2 +-
 .../resources/configs/tika-config-jdbc-emitter.xml |   4 +-
 5 files changed, 171 insertions(+), 55 deletions(-)
 copy tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/resources/configs/{tika-config-jdbc-emitter.xml => tika-config-jdbc-emitter-attachments.xml} (80%)


[tika] 02/02: determine jdbcemitter is not yet thread safe; improve documentation and remove code that incorrectly hopes for thread safety.

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git

commit b8745d1e7f58119cea8d2422dbff7e1edb4e0bb4
Author: tallison <ta...@apache.org>
AuthorDate: Thu Sep 1 13:18:53 2022 -0400

    determine jdbcemitter is not yet thread safe; improve documentation and remove code that incorrectly hopes for thread safety.
---
 .../tika/pipes/emitter/jdbc/JDBCEmitter.java       | 137 ++++++++++-----------
 1 file changed, 64 insertions(+), 73 deletions(-)

diff --git a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
index 7cd1bbaa6..330515858 100644
--- a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
+++ b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
@@ -24,8 +24,10 @@ import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -44,10 +46,9 @@ import org.apache.tika.utils.StringUtils;
 
 /**
  * This is only an initial, basic implementation of an emitter for JDBC.
- * For now, it only processes the first metadata object in the list.
  * <p>
- * Later implementations may handle embedded files along the lines of
- * the OpenSearch/Solr emitters.
+ * It is currently NOT thread safe because of the shared prepared statement,
+ * and depending on the jdbc implementation because of the shared connection.
  */
 public class JDBCEmitter extends AbstractEmitter implements Initializable, Closeable {
 
@@ -57,9 +58,14 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
     }
 
     private static final Logger LOGGER = LoggerFactory.getLogger(JDBCEmitter.class);
-    //the "write" lock is used to make the connection and to configure the insertstatement
-    //the "read" lock is used for preparing the insert and inserting
+    //the "write" lock is used for creating the table
     private static ReadWriteLock READ_WRITE_LOCK = new ReentrantReadWriteLock();
+
+    //this keeps track of which table + connection string have been created
+    //so that only one table is created per table + connection string.
+    //This is necessary for testing and if someone specifies multiple
+    //different jdbc emitters.
+    private static Set<String> TABLES_CREATED = new HashSet<>();
     private String connectionString;
     private String insert;
     private String createTable;
@@ -70,17 +76,18 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
 
     private AttachmentStrategy attachmentStrategy = AttachmentStrategy.FIRST_ONLY;
 
-    private volatile boolean initialized = false;
 
     /**
      * This is called immediately after the table is created.
      * The purpose of this is to allow for adding a complex primary key or
      * other constraint on the table after it is created.
+     *
      * @param alterTable
      */
     public void setAlterTable(String alterTable) {
         this.alterTable = alterTable;
     }
+
     @Field
     public void setCreateTable(String createTable) {
         this.createTable = createTable;
@@ -124,6 +131,7 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
             throw new IllegalArgumentException("attachmentStrategy must be 'all' or 'first_only'");
         }
     }
+
     @Override
     public void emit(String emitKey, List<Metadata> metadataList)
             throws IOException, TikaEmitterException {
@@ -138,26 +146,18 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
     }
 
     private void emitAll(String emitKey, List<Metadata> metadataList) throws TikaEmitterException {
-        //we aren't currently batching inserts
-        //because of risk of crashing in pipes handler.
-        READ_WRITE_LOCK.readLock().lock();
         try {
-            try {
-                for (int i = 0; i < metadataList.size(); i++) {
-                    insertStatement.clearParameters();
-                    int col = 0;
-                    insertStatement.setString(++col, emitKey);
-                    insertStatement.setInt(++col, i);
-                    for (Map.Entry<String, String> e : keys.entrySet()) {
-                        updateValue(insertStatement, ++col, e.getKey(), e.getValue(),
-                                i, metadataList);
-                    }
-                    insertStatement.addBatch();
+            for (int i = 0; i < metadataList.size(); i++) {
+                insertStatement.clearParameters();
+                int col = 0;
+                insertStatement.setString(++col, emitKey);
+                insertStatement.setInt(++col, i);
+                for (Map.Entry<String, String> e : keys.entrySet()) {
+                    updateValue(insertStatement, ++col, e.getKey(), e.getValue(), i, metadataList);
                 }
-                insertStatement.executeBatch();
-            } finally {
-                READ_WRITE_LOCK.readLock().unlock();
+                insertStatement.addBatch();
             }
+            insertStatement.executeBatch();
         } catch (SQLException e) {
             try {
                 LOGGER.warn("problem during emit; going to try to reconnect", e);
@@ -172,22 +172,17 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
 
     }
 
-    private void emitFirstOnly(String emitKey, List<Metadata> metadataList) throws TikaEmitterException {
-        //we aren't currently batching inserts
-        //because of risk of crashing in pipes handler.
-        READ_WRITE_LOCK.readLock().lock();
+    private void emitFirstOnly(String emitKey, List<Metadata> metadataList)
+            throws TikaEmitterException {
+
         try {
-            try {
-                insertStatement.clearParameters();
-                int i = 0;
-                insertStatement.setString(++i, emitKey);
-                for (Map.Entry<String, String> e : keys.entrySet()) {
-                    updateValue(insertStatement, ++i, e.getKey(), e.getValue(), 0, metadataList);
-                }
-                insertStatement.execute();
-            } finally {
-                READ_WRITE_LOCK.readLock().unlock();
+            insertStatement.clearParameters();
+            int i = 0;
+            insertStatement.setString(++i, emitKey);
+            for (Map.Entry<String, String> e : keys.entrySet()) {
+                updateValue(insertStatement, ++i, e.getKey(), e.getValue(), 0, metadataList);
             }
+            insertStatement.execute();
         } catch (SQLException e) {
             try {
                 LOGGER.warn("problem during emit; going to try to reconnect", e);
@@ -204,27 +199,22 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
 
     private void reconnect() throws SQLException {
         SQLException ex = null;
-        try {
-            READ_WRITE_LOCK.writeLock().lock();
-            for (int i = 0; i < 3; i++) {
-                try {
-                    connection = DriverManager.getConnection(connectionString);
-                    insertStatement = connection.prepareStatement(insert);
-                    return;
-                } catch (SQLException e) {
-                    LOGGER.warn("couldn't reconnect to db", e);
-                    ex = e;
-                }
+        for (int i = 0; i < 3; i++) {
+            try {
+                connection = DriverManager.getConnection(connectionString);
+                insertStatement = connection.prepareStatement(insert);
+                return;
+            } catch (SQLException e) {
+                LOGGER.warn("couldn't reconnect to db", e);
+                ex = e;
             }
-        } finally {
-            READ_WRITE_LOCK.writeLock().unlock();
         }
         throw ex;
     }
 
     private void updateValue(PreparedStatement insertStatement, int i, String key, String type,
-                             int metadataListIndex,
-                             List<Metadata> metadataList) throws SQLException {
+                             int metadataListIndex, List<Metadata> metadataList)
+            throws SQLException {
         //for now we're only taking the info from the container document.
         Metadata metadata = metadataList.get(metadataListIndex);
         String val = metadata.get(key);
@@ -293,33 +283,33 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
         } catch (SQLException e) {
             throw new TikaConfigException("couldn't open connection: " + connectionString, e);
         }
-        try {
+        if (!StringUtils.isBlank(createTable)) {
+            //synchronize table creation
             READ_WRITE_LOCK.writeLock().lock();
-            if (!initialized && !StringUtils.isBlank(createTable)) {
-                try (Statement st = connection.createStatement()) {
-                    st.execute(createTable);
-                    if (!StringUtils.isBlank(alterTable)) {
-                        st.execute(alterTable);
-                    }
-                    if (! connection.getAutoCommit()) {
-                        connection.commit();
+            try {
+                String tableCreationString = connectionString + " " + createTable;
+                if (!TABLES_CREATED.contains(tableCreationString)) {
+                    try (Statement st = connection.createStatement()) {
+                        st.execute(createTable);
+                        if (!StringUtils.isBlank(alterTable)) {
+                            st.execute(alterTable);
+                        }
+                        TABLES_CREATED.add(tableCreationString);
+                    } catch (SQLException e) {
+                        throw new TikaConfigException("can't create table", e);
                     }
-                    connection.commit();
-                    initialized = true;
-                } catch (SQLException e) {
-                    throw new TikaConfigException("can't create table", e);
                 }
+            } finally {
+                READ_WRITE_LOCK.writeLock().unlock();
             }
-
-            try {
-                insertStatement = connection.prepareStatement(insert);
-            } catch (SQLException e) {
-                throw new TikaConfigException("can't create insert statement", e);
-            }
-        } finally {
-            READ_WRITE_LOCK.writeLock().unlock();
+        }
+        try {
+            insertStatement = connection.prepareStatement(insert);
+        } catch (SQLException e) {
+            throw new TikaConfigException("can't create insert statement", e);
         }
 
+
     }
 
     @Override
@@ -332,6 +322,7 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
         TODO: This is currently not ever called.  We need rework the PipesParser
         to ensure that emitters are closed cleanly.
      */
+
     /**
      * @throws IOException
      */


[tika] 01/02: TIKA-3846 allow a strategy for attachments in the jdbc emitter

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 7e054f49d910296dd0ef936453e276b342452669
Author: tallison <ta...@apache.org>
AuthorDate: Thu Sep 1 12:39:41 2022 -0400

    TIKA-3846 allow a strategy for attachments in the jdbc emitter
---
 .../tika/pipes/emitter/jdbc/JDBCEmitter.java       | 111 ++++++++++++++++++---
 .../tika/pipes/emitter/jdbc/JDBCEmitterTest.java   |  48 ++++++++-
 ...ml => tika-config-jdbc-emitter-attachments.xml} |  10 +-
 .../tika-config-jdbc-emitter-existing-table.xml    |   2 +-
 .../resources/configs/tika-config-jdbc-emitter.xml |   4 +-
 5 files changed, 150 insertions(+), 25 deletions(-)

diff --git a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
index 85b3449e2..7cd1bbaa6 100644
--- a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
+++ b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
@@ -51,19 +51,36 @@ import org.apache.tika.utils.StringUtils;
  */
 public class JDBCEmitter extends AbstractEmitter implements Initializable, Closeable {
 
+    public enum AttachmentStrategy {
+        FIRST_ONLY, ALL
+        //anything else?
+    }
+
     private static final Logger LOGGER = LoggerFactory.getLogger(JDBCEmitter.class);
-    private static volatile boolean INITIALIZED = false;
     //the "write" lock is used to make the connection and to configure the insertstatement
     //the "read" lock is used for preparing the insert and inserting
     private static ReadWriteLock READ_WRITE_LOCK = new ReentrantReadWriteLock();
     private String connectionString;
     private String insert;
     private String createTable;
-    private String idColumn;
+    private String alterTable;
     private Map<String, String> keys;
     private Connection connection;
     private PreparedStatement insertStatement;
 
+    private AttachmentStrategy attachmentStrategy = AttachmentStrategy.FIRST_ONLY;
+
+    private volatile boolean initialized = false;
+
+    /**
+     * This is called immediately after the table is created.
+     * The purpose of this is to allow for adding a complex primary key or
+     * other constraint on the table after it is created.
+     * @param alterTable
+     */
+    public void setAlterTable(String alterTable) {
+        this.alterTable = alterTable;
+    }
     @Field
     public void setCreateTable(String createTable) {
         this.createTable = createTable;
@@ -74,11 +91,6 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
         this.insert = insert;
     }
 
-    @Field
-    public void setIdColumn(String idColumn) {
-        this.idColumn = idColumn;
-    }
-
     @Field
     public void setConnection(String connectionString) {
         this.connectionString = connectionString;
@@ -98,12 +110,69 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
         this.keys = keys;
     }
 
+    public void setAttachmentStrategy(AttachmentStrategy attachmentStrategy) {
+        this.attachmentStrategy = attachmentStrategy;
+    }
+
+    @Field
+    public void setAttachmentStrategy(String attachmentStrategy) {
+        if ("all".equalsIgnoreCase(attachmentStrategy)) {
+            setAttachmentStrategy(AttachmentStrategy.ALL);
+        } else if ("first_only".equalsIgnoreCase(attachmentStrategy)) {
+            setAttachmentStrategy(AttachmentStrategy.FIRST_ONLY);
+        } else {
+            throw new IllegalArgumentException("attachmentStrategy must be 'all' or 'first_only'");
+        }
+    }
     @Override
     public void emit(String emitKey, List<Metadata> metadataList)
             throws IOException, TikaEmitterException {
         if (metadataList == null || metadataList.size() < 1) {
             return;
         }
+        if (attachmentStrategy == AttachmentStrategy.FIRST_ONLY) {
+            emitFirstOnly(emitKey, metadataList);
+        } else {
+            emitAll(emitKey, metadataList);
+        }
+    }
+
+    private void emitAll(String emitKey, List<Metadata> metadataList) throws TikaEmitterException {
+        //we aren't currently batching inserts
+        //because of risk of crashing in pipes handler.
+        READ_WRITE_LOCK.readLock().lock();
+        try {
+            try {
+                for (int i = 0; i < metadataList.size(); i++) {
+                    insertStatement.clearParameters();
+                    int col = 0;
+                    insertStatement.setString(++col, emitKey);
+                    insertStatement.setInt(++col, i);
+                    for (Map.Entry<String, String> e : keys.entrySet()) {
+                        updateValue(insertStatement, ++col, e.getKey(), e.getValue(),
+                                i, metadataList);
+                    }
+                    insertStatement.addBatch();
+                }
+                insertStatement.executeBatch();
+            } finally {
+                READ_WRITE_LOCK.readLock().unlock();
+            }
+        } catch (SQLException e) {
+            try {
+                LOGGER.warn("problem during emit; going to try to reconnect", e);
+                //something went wrong
+                //try to reconnect
+                reconnect();
+            } catch (SQLException ex) {
+                throw new TikaEmitterException("Couldn't reconnect!", ex);
+            }
+            throw new TikaEmitterException("couldn't emit", e);
+        }
+
+    }
+
+    private void emitFirstOnly(String emitKey, List<Metadata> metadataList) throws TikaEmitterException {
         //we aren't currently batching inserts
         //because of risk of crashing in pipes handler.
         READ_WRITE_LOCK.readLock().lock();
@@ -113,7 +182,7 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
                 int i = 0;
                 insertStatement.setString(++i, emitKey);
                 for (Map.Entry<String, String> e : keys.entrySet()) {
-                    updateValue(insertStatement, ++i, e.getKey(), e.getValue(), metadataList);
+                    updateValue(insertStatement, ++i, e.getKey(), e.getValue(), 0, metadataList);
                 }
                 insertStatement.execute();
             } finally {
@@ -130,6 +199,7 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
             }
             throw new TikaEmitterException("couldn't emit", e);
         }
+
     }
 
     private void reconnect() throws SQLException {
@@ -153,9 +223,10 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
     }
 
     private void updateValue(PreparedStatement insertStatement, int i, String key, String type,
+                             int metadataListIndex,
                              List<Metadata> metadataList) throws SQLException {
         //for now we're only taking the info from the container document.
-        Metadata metadata = metadataList.get(0);
+        Metadata metadata = metadataList.get(metadataListIndex);
         String val = metadata.get(key);
         switch (type) {
             case "string":
@@ -224,22 +295,30 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
         }
         try {
             READ_WRITE_LOCK.writeLock().lock();
-            if (!INITIALIZED && !StringUtils.isBlank(createTable)) {
+            if (!initialized && !StringUtils.isBlank(createTable)) {
                 try (Statement st = connection.createStatement()) {
                     st.execute(createTable);
-                    INITIALIZED = true;
+                    if (!StringUtils.isBlank(alterTable)) {
+                        st.execute(alterTable);
+                    }
+                    if (! connection.getAutoCommit()) {
+                        connection.commit();
+                    }
+                    connection.commit();
+                    initialized = true;
                 } catch (SQLException e) {
                     throw new TikaConfigException("can't create table", e);
                 }
             }
+
+            try {
+                insertStatement = connection.prepareStatement(insert);
+            } catch (SQLException e) {
+                throw new TikaConfigException("can't create insert statement", e);
+            }
         } finally {
             READ_WRITE_LOCK.writeLock().unlock();
         }
-        try {
-            insertStatement = connection.prepareStatement(insert);
-        } catch (SQLException e) {
-            throw new TikaConfigException("can't create insert statement", e);
-        }
 
     }
 
diff --git a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitterTest.java b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitterTest.java
index cbe6ba374..a34251ebc 100644
--- a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitterTest.java
+++ b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitterTest.java
@@ -29,6 +29,7 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Locale;
 
 import org.apache.commons.io.IOUtils;
 import org.junit.jupiter.api.Test;
@@ -58,7 +59,7 @@ public class JDBCEmitterTest {
         data.add(new String[]{"k1", "true", "k2", "some string3", "k3", "6", "k4", "102"});
         int id = 0;
         for (String[] d : data) {
-            emitter.emit("id" + id++, m(d));
+            emitter.emit("id" + id++, Collections.singletonList(m(d)));
         }
 
         try (Connection connection = DriverManager.getConnection(connectionString)) {
@@ -101,7 +102,7 @@ public class JDBCEmitterTest {
         data.add(new String[]{"k1", "true", "k2", "some string3", "k3", "6", "k4", "102"});
         int id = 0;
         for (String[] d : data) {
-            emitter.emit("id" + id++, m(d));
+            emitter.emit("id" + id++, Collections.singletonList(m(d)));
         }
 
         try (Connection connection = DriverManager.getConnection(connectionString)) {
@@ -119,7 +120,46 @@ public class JDBCEmitterTest {
                 }
             }
         }
+    }
+
+    @Test
+    public void testAttachments(@TempDir Path tmpDir) throws Exception {
+        Files.createDirectories(tmpDir.resolve("db"));
+        Path dbDir = tmpDir.resolve("db/h2");
+        Path config = tmpDir.resolve("tika-config.xml");
+        String connectionString = "jdbc:h2:file:" + dbDir.toAbsolutePath();
+
+        writeConfig("/configs/tika-config-jdbc-emitter-attachments.xml",
+                connectionString, config);
+
+        EmitterManager emitterManager = EmitterManager.load(config);
+        Emitter emitter = emitterManager.getEmitter();
+        List<Metadata> data = new ArrayList<>();
+        data.add(m("k1", "true", "k2", "some string1", "k3", "4", "k4", "100"));
+        data.add(m("k1", "false", "k2", "some string2", "k3", "5", "k4", "101"));
+        data.add(m("k1", "true", "k2", "some string3", "k3", "6", "k4", "102"));
+        emitter.emit("id0", data);
 
+
+        try (Connection connection = DriverManager.getConnection(connectionString)) {
+            try (Statement st = connection.createStatement()) {
+                try (ResultSet rs = st.executeQuery("select * from test")) {
+                    int rows = 0;
+                    assertEquals("path", rs.getMetaData().getColumnName(1).toLowerCase(Locale.US));
+                    assertEquals("attachment_num",
+                            rs.getMetaData().getColumnName(2).toLowerCase(Locale.US));
+                    while (rs.next()) {
+                        assertEquals("id0", rs.getString(1));
+                        assertEquals(rows, rs.getInt(2));
+                        assertEquals(rows % 2 == 0, rs.getBoolean(3));
+                        assertEquals("some string" + (rows + 1), rs.getString(4));
+                        assertEquals(rows + 4, rs.getInt(5));
+                        assertEquals(100 + rows, rs.getLong(6));
+                        rows++;
+                    }
+                }
+            }
+        }
     }
 
     private void writeConfig(String srcConfig, String dbDir, Path config) throws IOException {
@@ -128,11 +168,11 @@ public class JDBCEmitterTest {
         Files.write(config, xml.getBytes(StandardCharsets.UTF_8));
     }
 
-    private List<Metadata> m(String... strings) {
+    private Metadata m(String... strings) {
         Metadata metadata = new Metadata();
         for (int i = 0; i < strings.length; i++) {
             metadata.set(strings[i], strings[++i]);
         }
-        return Collections.singletonList(metadata);
+        return metadata;
     }
 }
diff --git a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/resources/configs/tika-config-jdbc-emitter.xml b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/resources/configs/tika-config-jdbc-emitter-attachments.xml
similarity index 80%
copy from tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/resources/configs/tika-config-jdbc-emitter.xml
copy to tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/resources/configs/tika-config-jdbc-emitter-attachments.xml
index 04206740b..93130db65 100644
--- a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/resources/configs/tika-config-jdbc-emitter.xml
+++ b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/resources/configs/tika-config-jdbc-emitter-attachments.xml
@@ -24,14 +24,17 @@
         <name>jdbc</name>
         <connection>CONNECTION_STRING</connection>
         <createTable>create table test
-          (path varchar(512) primary key,
+          (path varchar(512) not null,
+          attachment_num integer not null,
           k1 boolean,
           k2 varchar(512),
           k3 integer,
           k4 long);
         </createTable>
-        <idColumn>path</idColumn>
-        <insert>insert into test (path, k1, k2, k3, k4) values (?,?,?,?,?);
+        <alterTable>alter table test add primary key (path, attachment_num)</alterTable>
+        <!-- the jdbc emitter always puts ths emitKey value as the first
+             item -->
+        <insert>insert into test (path, attachment_num, k1, k2, k3, k4) values (?,?,?,?,?,?);
         </insert>
         <!-- these are the keys in the metadata object.
             The emitKey is added as the first element in the insert statement.
@@ -45,6 +48,7 @@
           <key k="k3" v="int"/>
           <key k="k4" v="long"/>
         </keys>
+        <attachmentStrategy>all</attachmentStrategy>
       </params>
     </emitter>
   </emitters>
diff --git a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/resources/configs/tika-config-jdbc-emitter-existing-table.xml b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/resources/configs/tika-config-jdbc-emitter-existing-table.xml
index 040d98386..3b9befa66 100644
--- a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/resources/configs/tika-config-jdbc-emitter-existing-table.xml
+++ b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/resources/configs/tika-config-jdbc-emitter-existing-table.xml
@@ -23,7 +23,6 @@
       <params>
         <name>jdbc</name>
         <connection>CONNECTION_STRING</connection>
-        <idColumn>path</idColumn>
         <insert>insert into test (path, k1, k2, k3, k4) values (?,?,?,?,?);
         </insert>
         <!-- these are the keys in the metadata object.
@@ -38,6 +37,7 @@
           <key k="k3" v="int"/>
           <key k="k4" v="long"/>
         </keys>
+        <attachmentStrategy>first_only</attachmentStrategy>
       </params>
     </emitter>
   </emitters>
diff --git a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/resources/configs/tika-config-jdbc-emitter.xml b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/resources/configs/tika-config-jdbc-emitter.xml
index 04206740b..d86903992 100644
--- a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/resources/configs/tika-config-jdbc-emitter.xml
+++ b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/resources/configs/tika-config-jdbc-emitter.xml
@@ -30,7 +30,8 @@
           k3 integer,
           k4 long);
         </createTable>
-        <idColumn>path</idColumn>
+        <!-- the jdbc emitter always puts ths emitKey value as the first
+             item -->
         <insert>insert into test (path, k1, k2, k3, k4) values (?,?,?,?,?);
         </insert>
         <!-- these are the keys in the metadata object.
@@ -45,6 +46,7 @@
           <key k="k3" v="int"/>
           <key k="k4" v="long"/>
         </keys>
+        <attachmentStrategy>first_only</attachmentStrategy>
       </params>
     </emitter>
   </emitters>