You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/03/01 20:34:30 UTC

[tika] 02/02: WIP -- do not merge...allow instance credentials in s3 components

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch TIKA-3304
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 221b29d90ecc80d8d59cb0bd9604a925e8b18c67
Author: tballison <ta...@apache.org>
AuthorDate: Mon Mar 1 15:34:14 2021 -0500

    WIP -- do not merge...allow instance credentials in s3 components
---
 .../tika/exception/TikaTimeoutException.java       |  26 +++++
 .../tika/pipes/fetchiterator/FetchIterator.java    | 123 +++++++++++++++------
 .../fetchiterator/FileSystemFetchIterator.java     |   3 +-
 .../fetchiterator/FileSystemFetchIteratorTest.java |  24 +---
 .../apache/tika/pipes/emitter/s3/S3Emitter.java    |  51 +++++----
 .../src/test/java/TestCSVFetchIterator.java        |  35 +++---
 .../fetchiterator/jdbc/JDBCFetchIterator.java      |  12 +-
 .../fetchiterator/jdbc/TestJDBCFetchIterator.java  |  17 ++-
 .../pipes/fetchiterator/s3/S3FetchIterator.java    |  23 +++-
 .../fetchiterator/s3/TestS3FetchIterator.java      |   9 +-
 .../apache/tika/pipes/fetcher/s3/S3Fetcher.java    |  97 +++++++++-------
 .../java/org/apache/tika/pipes/async/AsyncCli.java |   1 -
 .../org/apache/tika/pipes/async/AsyncConfig.java   |  54 +++++++++
 .../async/{AsyncCli.java => AsyncProcessor.java}   |  89 +++++++++------
 .../org/apache/tika/pipes/driver/AsyncCliTest.java |  35 ++++++
 15 files changed, 414 insertions(+), 185 deletions(-)

diff --git a/tika-core/src/main/java/org/apache/tika/exception/TikaTimeoutException.java b/tika-core/src/main/java/org/apache/tika/exception/TikaTimeoutException.java
new file mode 100644
index 0000000..a53dbd6
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/exception/TikaTimeoutException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.exception;
+
+/**
+ * Runtime/unchecked version of {@link java.util.concurrent.TimeoutException}
+ */
+public class TikaTimeoutException extends RuntimeException {
+    public TikaTimeoutException(String message) {
+        super(message);
+    }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
index 4794691..a92668d 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
@@ -21,53 +21,46 @@ import org.apache.tika.config.Initializable;
 import org.apache.tika.config.InitializableProblemHandler;
 import org.apache.tika.config.Param;
 import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.exception.TikaTimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 /**
  * Abstract class that handles the testing for timeouts/thread safety
  * issues.  Concrete classes implement the blocking {@link #enqueue()}.
- * <p>
- * This must be "called", obviously...
+ * If there's an exception in the enqueuing thread, this will throw
+ * a RuntimeException.  It will throw an IllegalStateException if
+ * next() is called after hasNext() has returned false.
  */
-public abstract class FetchIterator implements Callable<Integer>, Initializable {
+public abstract class FetchIterator implements Callable<Integer>,
+        Iterable<FetchEmitTuple>, Initializable {
 
     public static final long DEFAULT_MAX_WAIT_MS = 300_000;
     public static final int DEFAULT_QUEUE_SIZE = 1000;
+
     public static final FetchEmitTuple COMPLETED_SEMAPHORE =
             new FetchEmitTuple(null, null, null);
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(FetchIterator.class);
+
     private long maxWaitMs = DEFAULT_MAX_WAIT_MS;
-    private int numConsumers = -1;
     private ArrayBlockingQueue<FetchEmitTuple> queue = null;
     private int queueSize = DEFAULT_QUEUE_SIZE;
     private String fetcherName;
     private String emitterName;
     private int added = 0;
     private FetchEmitTuple.ON_PARSE_EXCEPTION onParseException = FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT;
-
-    public FetchIterator() {
-
-    }
-
-    public FetchIterator(String fetcherName) {
-        this.fetcherName = fetcherName;
-    }
-
-    /**
-     * This must be called before 'calling' this object.
-     * @param numConsumers
-     */
-    public ArrayBlockingQueue<FetchEmitTuple> init(int numConsumers) {
-        this.queue = new ArrayBlockingQueue<>(queueSize);
-        this.numConsumers = numConsumers;
-        return queue;
-    }
+    private FutureTask<Integer> futureTask;
 
     @Field
     public void setFetcherName(String fetcherName) {
@@ -117,26 +110,16 @@ public abstract class FetchIterator implements Callable<Integer>, Initializable
         return onParseException;
     }
 
-    @Override
     public Integer call() throws Exception {
-        if (queue == null || numConsumers < 0) {
-            throw new IllegalStateException("Must call 'init' before calling this object");
-        }
-        System.out.println("fetch iterator");
         enqueue();
-        System.out.println("fetch iterator finshed enqueing");
-        for (int i = 0; i < numConsumers; i++) {
-            tryToAdd(COMPLETED_SEMAPHORE);
-        }
+        tryToAdd(COMPLETED_SEMAPHORE);
         return added;
     }
 
     protected abstract void enqueue() throws IOException, TimeoutException, InterruptedException;
 
     protected void tryToAdd(FetchEmitTuple p) throws InterruptedException, TimeoutException {
-        if (p != COMPLETED_SEMAPHORE) {
-            added++;
-        }
+        added++;
         boolean offered = queue.offer(p, maxWaitMs, TimeUnit.MILLISECONDS);
         if (!offered) {
             throw new TimeoutException("timed out while offering");
@@ -153,4 +136,76 @@ public abstract class FetchIterator implements Callable<Integer>, Initializable
         //no-op
     }
 
+    @Override
+    public Iterator<FetchEmitTuple> iterator() {
+        if (futureTask != null) {
+            throw new IllegalStateException("Can't call iterator more than once!");
+        }
+        futureTask = new FutureTask<>(this);
+        queue = new ArrayBlockingQueue<>(queueSize);
+        new Thread(futureTask).start();
+        return new TupleIterator();
+    }
+
+    private class TupleIterator implements Iterator<FetchEmitTuple> {
+        FetchEmitTuple next = null;
+
+        @Override
+        public boolean hasNext() {
+            if (next == null) {
+                next = pollNext();
+            }
+            return next != COMPLETED_SEMAPHORE;
+        }
+
+        @Override
+        public FetchEmitTuple next() {
+            if (next == COMPLETED_SEMAPHORE) {
+                throw new IllegalStateException(
+                        "don't call next() after hasNext() has returned false!");
+            }
+            FetchEmitTuple ret = next;
+            next = pollNext();
+            return ret;
+        }
+
+        private FetchEmitTuple pollNext() throws TikaTimeoutException {
+
+            FetchEmitTuple t = null;
+            long start = System.currentTimeMillis();
+            try {
+                long elapsed = System.currentTimeMillis()-start;
+                while (t == null && elapsed < maxWaitMs) {
+                    checkThreadOk();
+                    t = queue.poll(100, TimeUnit.MILLISECONDS);
+                    elapsed = System.currentTimeMillis()-start;
+                }
+            } catch (InterruptedException e) {
+                LOGGER.warn("interrupted");
+                return COMPLETED_SEMAPHORE;
+            }
+            if (t == null) {
+                throw new TikaTimeoutException("waited longer than "+
+                        maxWaitMs+"ms for the next tuple");
+            }
+            return t;
+        }
+
+        /**
+         * this checks to make sure that the thread hasn't terminated early.
+         * Will return true if the thread has successfully completed or if
+         * it has not completed.  Will return false if there has been a thread
+         * interrupt. Will throw a RuntimeException if there's been
+         * an execution exception in the thread.
+         */
+        private void checkThreadOk() throws InterruptedException {
+            if (futureTask.isDone()) {
+                try {
+                    futureTask.get();
+                } catch (ExecutionException e) {
+                    throw new RuntimeException(e.getCause());
+                }
+            }
+        }
+    }
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
index 8a06769..4451768 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
@@ -42,8 +42,7 @@ public class FileSystemFetchIterator
     public FileSystemFetchIterator() {
     }
 
-    public FileSystemFetchIterator(String fetcherName, Path basePath) {
-        super(fetcherName);
+    public FileSystemFetchIterator(Path basePath) {
         this.basePath = basePath;
     }
 
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java b/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
index c5b5558..0290a59 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
@@ -25,11 +25,6 @@ import java.nio.file.Paths;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -50,23 +45,12 @@ public class FileSystemFetchIteratorTest {
         }
 
         String fetcherName = "fs";
-        ExecutorService es = Executors.newFixedThreadPool(1);
-        ExecutorCompletionService<Integer> cs = new ExecutorCompletionService<>(es);
-        FetchIterator it = new FileSystemFetchIterator(fetcherName, root);
-        it.setQueueSize(20000);
-        ArrayBlockingQueue<FetchEmitTuple> q = it.init(1);
-
-        cs.submit(it);
-
-
-        Future<Integer> f = cs.take();
-        f.get();
+        FetchIterator it = new FileSystemFetchIterator(root);
+        it.setFetcherName(fetcherName);
+        it.setQueueSize(2);
 
         Set<String> iteratorSet = new HashSet<>();
-        for (FetchEmitTuple p : q) {
-            if (p == FetchIterator.COMPLETED_SEMAPHORE) {
-                break;
-            }
+        for (FetchEmitTuple p : it) {
             iteratorSet.add(p.getFetchKey().getKey());
         }
 
diff --git a/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java b/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
index 4cae5c9..68fc1ee 100644
--- a/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
+++ b/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
@@ -16,6 +16,7 @@
  */
 package org.apache.tika.pipes.emitter.s3;
 
+import com.amazonaws.AmazonClientException;
 import com.amazonaws.SdkClientException;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.InstanceProfileCredentialsProvider;
@@ -78,7 +79,7 @@ import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
  *                  &lt;!-- optional; prefix to add to the path before emitting; default is no prefix --&gt;
  *                  &lt;param name="prefix" type="string"&gt;my-prefix&lt;/param&gt;
  *                  &lt;!-- optional; default is 'json' this will be added to the SOURCE_PATH
- *                                    if no emitter key is specified --&gt;
+ *                                    if no emitter key is specified. Do not add a "." before the extension --&gt;
  *                  &lt;param name="fileExtension" type="string"&gt;json&lt;/param&gt;
  *                  &lt;!-- optional; default is 'true'-- whether to copy the json to a local file before putting to s3 --&gt;
  *                  &lt;param name="spoolToTemp" type="bool"&gt;true&lt;/param&gt;
@@ -148,7 +149,7 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
      * @param path -- object path, not including the bucket
      * @param is inputStream to copy
      * @param userMetadata this will be written to the s3 ObjectMetadata's userMetadata
-     * @throws TikaEmitterException
+     * @throws TikaEmitterException or IOexception if there is a Runtime s3 client exception
      */
     @Override
     public void emit(String path, InputStream is, Metadata userMetadata) throws IOException, TikaEmitterException {
@@ -173,22 +174,22 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
                 }
             }
         }
-        ObjectMetadata objectMetadata = new ObjectMetadata();
-        if (length > 0) {
-            objectMetadata.setContentLength(length);
-        }
-        for (String n : userMetadata.names()) {
-            String[] vals = userMetadata.getValues(n);
-            if (vals.length > 1) {
-                LOGGER.warn("Can only write the first value for key {}. I see {} values.",
-                        n, vals.length);
-            }
-            objectMetadata.addUserMetadata(n, vals[0]);
-        }
         try {
+            ObjectMetadata objectMetadata = new ObjectMetadata();
+            if (length > 0) {
+                objectMetadata.setContentLength(length);
+            }
+            for (String n : userMetadata.names()) {
+                String[] vals = userMetadata.getValues(n);
+                if (vals.length > 1) {
+                    LOGGER.warn("Can only write the first value for key {}. I see {} values.",
+                            n, vals.length);
+                }
+                objectMetadata.addUserMetadata(n, vals[0]);
+            }
             s3Client.putObject(bucket, path, is, objectMetadata);
-        } catch (SdkClientException e) {
-            throw new TikaEmitterException("problem writing s3object", e);
+        } catch (AmazonClientException e) {
+            throw new IOException("problem writing s3object", e);
         }
     }
 
@@ -248,6 +249,12 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
 
 
 
+    /**
+     * This initializes the s3 client. Note, we wrap S3's RuntimeExceptions,
+     * e.g. AmazonClientException in a TikaConfigException.
+     * @param params params to use for initialization
+     * @throws TikaConfigException
+     */
     @Override
     public void initialize(Map<String, Param> params) throws TikaConfigException {
         //params have already been set...ignore them
@@ -261,10 +268,14 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
                     "must be either 'instance' or 'profile'");
         }
 
-        s3Client = AmazonS3ClientBuilder.standard()
-                .withRegion(region)
-                .withCredentials(provider)
-                .build();
+        try {
+            s3Client = AmazonS3ClientBuilder.standard()
+                    .withRegion(region)
+                    .withCredentials(provider)
+                    .build();
+        } catch (AmazonClientException e) {
+            throw new TikaConfigException("can't initialize s3 emitter", e);
+        }
     }
 
     @Override
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
index 74f3ebe..55ae655 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.tika.pipes.fetchiterator.FetchIterator.COMPLETED_SEMAPHORE;
 import static org.junit.Assert.assertEquals;
 
 public class TestCSVFetchIterator {
@@ -46,27 +47,32 @@ public class TestCSVFetchIterator {
         it.setCsvPath(p);
         it.setFetchKeyColumn("fetchKey");
         int numConsumers = 2;
-        ExecutorService es = Executors.newFixedThreadPool(numConsumers+1);
-        ArrayBlockingQueue<FetchEmitTuple> queue = it.init(numConsumers);
+        ExecutorService es = Executors.newFixedThreadPool(numConsumers);
         ExecutorCompletionService c = new ExecutorCompletionService(es);
-        c.submit(it);
+        ArrayBlockingQueue<FetchEmitTuple> queue = new ArrayBlockingQueue<>(100);
         List<MockFetcher> fetchers = new ArrayList<>();
         for (int i = 0; i < numConsumers; i++) {
             MockFetcher f = new MockFetcher(queue);
             fetchers.add(f);
             c.submit(f);
         }
+        for (FetchEmitTuple t : it) {
+            queue.offer(t);
+        }
+        for (int i = 0; i < numConsumers; i++) {
+            queue.offer(COMPLETED_SEMAPHORE);
+        }
         int finished = 0;
         int completed = 0;
         try {
-            while (finished++ < numConsumers+1) {
+            while (finished++ < numConsumers) {
                 Future<Integer> f = c.take();
                 completed += f.get();
             }
         } finally {
             es.shutdownNow();
         }
-        assertEquals(10, completed);
+        assertEquals(5, completed);
         for (MockFetcher f : fetchers) {
             for (FetchEmitTuple t : f.pairs) {
                 String id = t.getMetadata().get("id");
@@ -79,27 +85,16 @@ public class TestCSVFetchIterator {
         }
     }
 
-    @Test(expected = ExecutionException.class)
+    @Test(expected = RuntimeException.class)
     public void testBadFetchKeyCol() throws Exception {
         Path p = get("test-simple.csv");
         CSVFetchIterator it = new CSVFetchIterator();
         it.setFetcherName("fs");
         it.setCsvPath(p);
         it.setFetchKeyColumn("fetchKeyDoesntExist");
-        ExecutorService es = Executors.newFixedThreadPool(2);
-        ExecutorCompletionService c = new ExecutorCompletionService(es);
-        c.submit(it);
-        c.submit(new MockFetcher(it.init(1)));
-        int finished = 0;
-        try {
-            while (finished++ < 2) {
-                Future f = c.take();
-                f.get();
-            }
-        } finally {
-            es.shutdownNow();
-        }
+        for (FetchEmitTuple t : it) {
 
+        }
     }
 
     private Path get(String testFileName) throws Exception {
@@ -117,7 +112,7 @@ public class TestCSVFetchIterator {
         public Integer call() throws Exception {
             while (true) {
                 FetchEmitTuple t = queue.poll(1, TimeUnit.HOURS);
-                if (t == FetchIterator.COMPLETED_SEMAPHORE) {
+                if (t == COMPLETED_SEMAPHORE) {
                     return pairs.size();
                 }
                 pairs.add(t);
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
index 19da544..112f52a 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
@@ -93,6 +93,10 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
         this.select = select;
     }
 
+    public String getSelect() {
+        return select;
+    }
+
     @Override
     protected void enqueue() throws InterruptedException, IOException, TimeoutException {
         String fetcherName = getFetcherName();
@@ -152,14 +156,18 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
         for (int i = 1; i <= rs.getMetaData().getColumnCount(); i++) {
             if (i == fetchEmitKeyIndices.fetchKeyIndex) {
                 fetchKey = getString(i, rs);
+                if (fetchKey == null) {
+                    LOGGER.debug("fetchKey is empty for record " + toString(rs));
+                }
                 fetchKey = (fetchKey == null) ? "" : fetchKey;
-                LOGGER.debug("fetchKey is empty for record "+toString(rs));
                 continue;
             }
             if (i == fetchEmitKeyIndices.emitKeyIndex) {
                 emitKey = getString(i, rs);
+                if (emitKey == null) {
+                    LOGGER.debug("emitKey is empty for record "+toString(rs));
+                }
                 emitKey = (emitKey == null) ? "" : emitKey;
-                LOGGER.warn("emitKey is empty for record "+toString(rs));
                 continue;
             }
             String val = getString(i, rs);
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
index ee3fcee..9a92fc5 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
@@ -91,24 +91,29 @@ public class TestJDBCFetchIterator {
         int numConsumers = 5;
         FetchIterator fetchIterator = tk.getFetchIterator();
         ExecutorService es = Executors.newFixedThreadPool(numConsumers+1);
-        ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(es);
-        ArrayBlockingQueue<FetchEmitTuple> queue = fetchIterator.init(numConsumers);
-        completionService.submit(fetchIterator);
+        ExecutorCompletionService<Integer> completionService =
+                new ExecutorCompletionService<>(es);
+        ArrayBlockingQueue<FetchEmitTuple> queue = new ArrayBlockingQueue<>(100);
         List<MockFetcher> fetchers = new ArrayList<>();
         for (int i = 0; i < numConsumers; i++) {
             MockFetcher mockFetcher = new MockFetcher(queue);
             fetchers.add(mockFetcher);
             completionService.submit(mockFetcher);
         }
+        for (FetchEmitTuple t : fetchIterator) {
+            queue.offer(t);
+        }
+        for (int i = 0; i < numConsumers; i++) {
+            queue.offer(FetchIterator.COMPLETED_SEMAPHORE);
+        }
         int processed = 0;
         int completed = 0;
-        while (completed < numConsumers+1) {
+        while (completed < numConsumers) {
             Future<Integer> f = completionService.take();
             processed += f.get();
             completed++;
         }
-        //fetchiterator added + MockFetcher yields 2x
-        assertEquals(NUM_ROWS * 2, processed);
+        assertEquals(NUM_ROWS, processed);
         int cnt = 0;
         Matcher m = Pattern.compile("fk(\\d+)").matcher("");
         for (MockFetcher f : fetchers) {
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
index 15fb849..b8897a8 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
@@ -16,6 +16,7 @@
  */
 package org.apache.tika.pipes.fetchiterator.s3;
 
+import com.amazonaws.AmazonClientException;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.InstanceProfileCredentialsProvider;
 import com.amazonaws.auth.profile.ProfileCredentialsProvider;
@@ -44,7 +45,6 @@ import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
 
 public class S3FetchIterator extends FetchIterator implements Initializable {
 
-
     private static final Logger LOGGER = LoggerFactory.getLogger(S3FetchIterator.class);
     private String prefix = "";
     private String region;
@@ -82,6 +82,12 @@ public class S3FetchIterator extends FetchIterator implements Initializable {
         this.credentialsProvider = credentialsProvider;
     }
 
+    /**
+     * This initializes the s3 client. Note, we wrap S3's RuntimeExceptions,
+     * e.g. AmazonClientException in a TikaConfigException.
+     * @param params params to use for initialization
+     * @throws TikaConfigException
+     */
     @Override
     public void initialize(Map<String, Param> params) throws TikaConfigException {
         //params have already been set...ignore them
@@ -95,10 +101,14 @@ public class S3FetchIterator extends FetchIterator implements Initializable {
                     "must be either 'instance' or 'profile'");
         }
 
-        s3Client = AmazonS3ClientBuilder.standard()
-                .withRegion(region)
-                .withCredentials(provider)
-                .build();
+        try {
+            s3Client = AmazonS3ClientBuilder.standard()
+                    .withRegion(region)
+                    .withCredentials(provider)
+                    .build();
+        } catch (AmazonClientException e) {
+            throw new TikaConfigException("can't initialize s3 fetchiterator");
+        }
     }
 
     @Override
@@ -112,6 +122,7 @@ public class S3FetchIterator extends FetchIterator implements Initializable {
     @Override
     protected void enqueue() throws InterruptedException, IOException, TimeoutException {
         String fetcherName = getFetcherName();
+        String emitterName = getEmitterName();
         long start = System.currentTimeMillis();
         int count = 0;
         for (S3ObjectSummary summary : S3Objects.withPrefix(s3Client, bucket, prefix)) {
@@ -121,7 +132,7 @@ public class S3FetchIterator extends FetchIterator implements Initializable {
                     elapsed);
             tryToAdd(new FetchEmitTuple(
                     new FetchKey(fetcherName, summary.getKey()),
-                    new EmitKey(fetcherName, summary.getKey()),
+                    new EmitKey(emitterName, summary.getKey()),
                     new Metadata(), getOnParseException()));
             count++;
         }
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
index 51d20ac..567920d 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
@@ -46,17 +46,22 @@ public class TestS3FetchIterator {
         it.setRegion("");//select one
         it.initialize(Collections.EMPTY_MAP);
         int numConsumers = 6;
-        ArrayBlockingQueue<FetchEmitTuple> queue = it.init(numConsumers);
+        ArrayBlockingQueue<FetchEmitTuple> queue = new ArrayBlockingQueue<>(10);
 
         ExecutorService es = Executors.newFixedThreadPool(numConsumers+1);
         ExecutorCompletionService c = new ExecutorCompletionService(es);
-        c.submit(it);
         List<MockFetcher> fetchers = new ArrayList<>();
         for (int i = 0; i < numConsumers; i++) {
             MockFetcher fetcher = new MockFetcher(queue);
             fetchers.add(fetcher);
             c.submit(fetcher);
         }
+        for (FetchEmitTuple t : it) {
+            queue.offer(t);
+        }
+        for (int i = 0; i < numConsumers; i++) {
+            queue.offer(FetchIterator.COMPLETED_SEMAPHORE);
+        }
         int finished = 0;
         int completed = 0;
         try {
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
index 200e567..506a040 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
+++ b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
@@ -16,6 +16,7 @@
  */
 package org.apache.tika.pipes.fetcher.s3;
 
+import com.amazonaws.AmazonClientException;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.InstanceProfileCredentialsProvider;
 import com.amazonaws.auth.profile.ProfileCredentialsProvider;
@@ -66,24 +67,28 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
         LOGGER.debug("about to fetch fetchkey={} from bucket ({})",
                 fetchKey, bucket);
 
-        S3Object s3Object = s3Client.getObject(new GetObjectRequest(bucket, fetchKey));
-        if (extractUserMetadata) {
-            for (Map.Entry<String, String> e :
-                    s3Object.getObjectMetadata().getUserMetadata().entrySet()) {
-                metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
+        try {
+            S3Object s3Object = s3Client.getObject(new GetObjectRequest(bucket, fetchKey));
+            if (extractUserMetadata) {
+                for (Map.Entry<String, String> e :
+                        s3Object.getObjectMetadata().getUserMetadata().entrySet()) {
+                    metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
+                }
             }
-        }
-        if (! spoolToTemp) {
-            return TikaInputStream.get(
-                    s3Object.getObjectContent());
-        } else {
-            long start = System.currentTimeMillis();
-            TikaInputStream tis = TikaInputStream.get(
-                    s3Object.getObjectContent());
-            tis.getPath();
-            long elapsed = System.currentTimeMillis()-start;
-            LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
-            return tis;
+            if (!spoolToTemp) {
+                return TikaInputStream.get(
+                        s3Object.getObjectContent());
+            } else {
+                long start = System.currentTimeMillis();
+                TikaInputStream tis = TikaInputStream.get(
+                        s3Object.getObjectContent());
+                tis.getPath();
+                long elapsed = System.currentTimeMillis() - start;
+                LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
+                return tis;
+            }
+        } catch (AmazonClientException e) {
+            throw new IOException("s3 client exception", e);
         }
     }
 
@@ -93,26 +98,30 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
         LOGGER.debug("about to fetch fetchkey={} (start={} end={}) from bucket ({})",
                 fetchKey, startRange, endRange, bucket);
 
-        S3Object s3Object = s3Client.getObject(new GetObjectRequest(bucket, fetchKey)
-                .withRange(startRange, endRange));
+        try {
+            S3Object s3Object = s3Client.getObject(new GetObjectRequest(bucket, fetchKey)
+                    .withRange(startRange, endRange));
 
-        if (extractUserMetadata) {
-            for (Map.Entry<String, String> e :
-                    s3Object.getObjectMetadata().getUserMetadata().entrySet()) {
-                metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
+            if (extractUserMetadata) {
+                for (Map.Entry<String, String> e :
+                        s3Object.getObjectMetadata().getUserMetadata().entrySet()) {
+                    metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
+                }
             }
-        }
-        if (! spoolToTemp) {
-            return TikaInputStream.get(
-                    s3Object.getObjectContent());
-        } else {
-            long start = System.currentTimeMillis();
-            TikaInputStream tis = TikaInputStream.get(
-                    s3Object.getObjectContent());
-            tis.getPath();
-            long elapsed = System.currentTimeMillis()-start;
-            LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
-            return tis;
+            if (!spoolToTemp) {
+                return TikaInputStream.get(
+                        s3Object.getObjectContent());
+            } else {
+                long start = System.currentTimeMillis();
+                TikaInputStream tis = TikaInputStream.get(
+                        s3Object.getObjectContent());
+                tis.getPath();
+                long elapsed = System.currentTimeMillis() - start;
+                LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
+                return tis;
+            }
+        } catch (AmazonClientException e) {
+            throw new IOException(e);
         }
     }
 
@@ -154,6 +163,12 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
         this.credentialsProvider = credentialsProvider;
     }
 
+    /**
+     * This initializes the s3 client. Note, we wrap S3's RuntimeExceptions,
+     * e.g. AmazonClientException in a TikaConfigException.
+     * @param params params to use for initialization
+     * @throws TikaConfigException
+     */
     @Override
     public void initialize(Map<String, Param> params) throws TikaConfigException {
         //params have already been set...ignore them
@@ -167,10 +182,14 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
                     "must be either 'instance' or 'profile'");
         }
 
-        s3Client = AmazonS3ClientBuilder.standard()
-                .withRegion(region)
-                .withCredentials(provider)
-                .build();
+        try {
+            s3Client = AmazonS3ClientBuilder.standard()
+                    .withRegion(region)
+                    .withCredentials(provider)
+                    .build();
+        } catch (AmazonClientException e) {
+            throw new TikaConfigException("can't initialize s3 fetcher", e);
+        }
     }
 
     @Override
diff --git a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
index eea1fc3..2804a7a 100644
--- a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
+++ b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
@@ -11,7 +11,6 @@ import org.slf4j.LoggerFactory;
 
 
 import java.io.IOException;
-import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.sql.Connection;
diff --git a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
new file mode 100644
index 0000000..a86f563
--- /dev/null
+++ b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
@@ -0,0 +1,54 @@
+package org.apache.tika.pipes.async;
+
+import org.apache.tika.utils.StringUtils;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+public class AsyncConfig {
+
+    public static AsyncConfig load(Path p) throws IOException {
+        AsyncConfig asyncConfig = new AsyncConfig();
+
+        if (StringUtils.isBlank(asyncConfig.getJdbcString())) {
+            asyncConfig.dbDir = Files.createTempDirectory("tika-async-");
+            Path dbFile = asyncConfig.dbDir.resolve("tika-async");
+            asyncConfig.setJdbcString("jdbc:h2:file:" + dbFile.toAbsolutePath().toString() +
+                    ";AUTO_SERVER=TRUE");
+        } else {
+            asyncConfig.dbDir = null;
+        }
+        return asyncConfig;
+    }
+
+    private int queueSize = 1000;
+    private int maxConsumers = 10;
+    private String jdbcString;
+    private Path dbDir;
+
+    public int getQueueSize() {
+        return queueSize;
+    }
+
+    public int getMaxConsumers() {
+        return maxConsumers;
+    }
+
+    public String getJdbcString() {
+        return jdbcString;
+    }
+
+    public void setJdbcString(String jdbcString) {
+        this.jdbcString = jdbcString;
+    }
+
+    /**
+     * If no jdbc connection was specified, this
+     * dir contains the h2 database.  Otherwise, null.
+     * @return
+     */
+    public Path getTempDBDir() {
+        return dbDir;
+    }
+}
diff --git a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
similarity index 85%
copy from tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
copy to tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index eea1fc3..cfe7455 100644
--- a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
+++ b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -6,10 +6,11 @@ import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
 import org.apache.tika.pipes.fetchiterator.EmptyFetchIterator;
 import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.apache.tika.pipes.fetchiterator.FetchIterator;
+import org.apache.tika.utils.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
+import java.io.Closeable;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -30,49 +31,64 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-public class AsyncCli {
+public class AsyncProcessor implements Closeable, Callable<Integer> {
 
-    private static final Logger LOG = LoggerFactory.getLogger(AsyncCli.class);
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncProcessor.class);
 
-    public static void main(String[] args) throws Exception {
-        Path configPath = Paths.get(args[0]);
-        int maxConsumers = 20;
-        AsyncCli asyncCli = new AsyncCli();
-        Path dbDir = Paths.get("/Users/allison/Desktop/tmp-db");//Files.createTempDirectory("tika-async-db-");
-        try {
-            asyncCli.execute(dbDir, configPath, maxConsumers);
-        } finally {
-            FileUtils.deleteDirectory(dbDir.toFile());
+    private final Path tikaConfigPath;
+    private AsyncConfig asyncConfig;
+    private final ArrayBlockingQueue<FetchEmitTuple> queue;//tmp directory used if no jdbc string is configured
+
+    public AsyncProcessor (Path tikaConfigPath) {
+        this.tikaConfigPath = tikaConfigPath;
+        this.queue = new ArrayBlockingQueue<FetchEmitTuple>(asyncConfig.getQueueSize());
+
+    }
+
+    public synchronized boolean offer(List<FetchEmitTuple> fetchEmitTuples, long offerMs) {
+        if (queue == null) {
+            throw new IllegalStateException("queue hasn't been initialized yet.");
+        }
+        long start = System.currentTimeMillis();
+        long elapsed = System.currentTimeMillis()-start;
+        while (elapsed < offerMs) {
+            if (queue.remainingCapacity() > fetchEmitTuples.size()) {
+                queue.addAll(fetchEmitTuples);
+                return true;
+            }
+            elapsed = System.currentTimeMillis() - start;
         }
+        return false;
+    }
 
+    public synchronized boolean offer(FetchEmitTuple t, long offerMs) throws InterruptedException {
+        return queue.offer(t, offerMs, TimeUnit.MILLISECONDS);
     }
 
-    private void execute(Path dbDir, Path configPath, int maxConsumers) throws Exception {
-        TikaConfig tikaConfig = new TikaConfig(configPath);
 
-        String connectionString = setupTables(dbDir);
+    @Override
+    public Integer call() throws Exception {
+        this.asyncConfig = AsyncConfig.load(tikaConfigPath);
 
-        ExecutorService executorService = Executors.newFixedThreadPool(maxConsumers + 3);
+        setupTables();
+
+        ExecutorService executorService = Executors.newFixedThreadPool(
+                asyncConfig.getMaxConsumers() + 2);
         ExecutorCompletionService<Integer> executorCompletionService =
                 new ExecutorCompletionService<>(executorService);
 
-        try (Connection connection = DriverManager.getConnection(connectionString)) {
-            FetchIterator fetchIterator = tikaConfig.getFetchIterator();
-            if (fetchIterator instanceof EmptyFetchIterator) {
-                throw new IllegalArgumentException("can't have empty fetch iterator");
-            }
-            ArrayBlockingQueue<FetchEmitTuple> q = fetchIterator.init(maxConsumers);
-            AsyncTaskEnqueuer enqueuer = new AsyncTaskEnqueuer(q, connection);
-            executorCompletionService.submit(fetchIterator);
+        try (Connection connection = DriverManager.getConnection(asyncConfig.getJdbcString())) {
+            AsyncTaskEnqueuer enqueuer = new AsyncTaskEnqueuer(queue, connection);
+
             executorCompletionService.submit(enqueuer);
             executorCompletionService.submit(new AssignmentManager(connection, enqueuer));
 
-            for (int i = 0; i < maxConsumers; i++) {
+            for (int i = 0; i < asyncConfig.getMaxConsumers(); i++) {
                 executorCompletionService.submit(new AsyncWorker(connection,
-                        connectionString, i, configPath));
+                        asyncConfig.getJdbcString(), i, tikaConfigPath));
             }
             int completed = 0;
-            while (completed < maxConsumers+3) {
+            while (completed < asyncConfig.getMaxConsumers()+2) {
                 Future<Integer> future = executorCompletionService.take();
                 if (future != null) {
                     int val = future.get();
@@ -83,13 +99,11 @@ public class AsyncCli {
         } finally {
             executorService.shutdownNow();
         }
+        return 1;
     }
 
-    private String setupTables(Path dbDir) throws SQLException {
-        Path dbFile = dbDir.resolve("tika-async");
-        String url = "jdbc:h2:file:" + dbFile.toAbsolutePath().toString() +
-                ";AUTO_SERVER=TRUE";
-        Connection connection = DriverManager.getConnection(url);
+    private void setupTables() throws SQLException {
+        Connection connection = DriverManager.getConnection(asyncConfig.getJdbcString());
 
         String sql = "create table parse_queue " +
                 "(id bigint auto_increment primary key," +
@@ -114,7 +128,16 @@ public class AsyncCli {
                 "error_code tinyint)";
         connection.createStatement().execute(sql);
 
-        return url;
+    }
+
+    @Override
+    public void close() throws IOException {
+        //close down processes and db
+
+
+        if (asyncConfig.getTempDBDir() != null) {
+            FileUtils.deleteDirectory(asyncConfig.getTempDBDir().toFile());
+        }
     }
 
 
diff --git a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/driver/AsyncCliTest.java b/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/driver/AsyncCliTest.java
index efb5d3b..a84c13a 100644
--- a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/driver/AsyncCliTest.java
+++ b/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/driver/AsyncCliTest.java
@@ -11,4 +11,39 @@ public class AsyncCliTest {
         };
         AsyncCli.main(args);
     }
+
+    @Test
+    public void testUnhandled() throws InterruptedException {
+        Thread t = new Thread(new Task());
+
+        t.start();
+        t.join();
+        for (StackTraceElement el : t.getStackTrace()) {
+            System.out.println(el);
+        }
+    }
+
+    private static class Task implements Runnable {
+
+        @Override
+        public void run() {
+            Thread.currentThread().setUncaughtExceptionHandler(new MyUncaught());
+            for (int i = 0; i < 5; i++) {
+                System.out.println(i);
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException();
+                }
+            }
+            throw new RuntimeException("kaboom");
+        }
+    }
+
+    private static class MyUncaught implements Thread.UncaughtExceptionHandler {
+        @Override
+        public void uncaughtException(Thread t, Throwable e) {
+            throw new RuntimeException("bad");
+        }
+    }
 }