You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/03/01 20:34:30 UTC
[tika] 02/02: WIP -- do not merge...allow instance credentials in
s3 components
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch TIKA-3304
in repository https://gitbox.apache.org/repos/asf/tika.git
commit 221b29d90ecc80d8d59cb0bd9604a925e8b18c67
Author: tballison <ta...@apache.org>
AuthorDate: Mon Mar 1 15:34:14 2021 -0500
WIP -- do not merge...allow instance credentials in s3 components
---
.../tika/exception/TikaTimeoutException.java | 26 +++++
.../tika/pipes/fetchiterator/FetchIterator.java | 123 +++++++++++++++------
.../fetchiterator/FileSystemFetchIterator.java | 3 +-
.../fetchiterator/FileSystemFetchIteratorTest.java | 24 +---
.../apache/tika/pipes/emitter/s3/S3Emitter.java | 51 +++++----
.../src/test/java/TestCSVFetchIterator.java | 35 +++---
.../fetchiterator/jdbc/JDBCFetchIterator.java | 12 +-
.../fetchiterator/jdbc/TestJDBCFetchIterator.java | 17 ++-
.../pipes/fetchiterator/s3/S3FetchIterator.java | 23 +++-
.../fetchiterator/s3/TestS3FetchIterator.java | 9 +-
.../apache/tika/pipes/fetcher/s3/S3Fetcher.java | 97 +++++++++-------
.../java/org/apache/tika/pipes/async/AsyncCli.java | 1 -
.../org/apache/tika/pipes/async/AsyncConfig.java | 54 +++++++++
.../async/{AsyncCli.java => AsyncProcessor.java} | 89 +++++++++------
.../org/apache/tika/pipes/driver/AsyncCliTest.java | 35 ++++++
15 files changed, 414 insertions(+), 185 deletions(-)
diff --git a/tika-core/src/main/java/org/apache/tika/exception/TikaTimeoutException.java b/tika-core/src/main/java/org/apache/tika/exception/TikaTimeoutException.java
new file mode 100644
index 0000000..a53dbd6
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/exception/TikaTimeoutException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.exception;
+
+/**
+ * Runtime/unchecked version of {@link java.util.concurrent.TimeoutException}
+ */
+public class TikaTimeoutException extends RuntimeException {
+ public TikaTimeoutException(String message) {
+ super(message);
+ }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
index 4794691..a92668d 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
@@ -21,53 +21,46 @@ import org.apache.tika.config.Initializable;
import org.apache.tika.config.InitializableProblemHandler;
import org.apache.tika.config.Param;
import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.exception.TikaTimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Abstract class that handles the testing for timeouts/thread safety
* issues. Concrete classes implement the blocking {@link #enqueue()}.
- * <p>
- * This must be "called", obviously...
+ * If there's an exception in the enqueuing thread, this will throw
+ * a RuntimeException. It will throw an IllegalStateException if
+ * next() is called after hasNext() has returned false.
*/
-public abstract class FetchIterator implements Callable<Integer>, Initializable {
+public abstract class FetchIterator implements Callable<Integer>,
+ Iterable<FetchEmitTuple>, Initializable {
public static final long DEFAULT_MAX_WAIT_MS = 300_000;
public static final int DEFAULT_QUEUE_SIZE = 1000;
+
public static final FetchEmitTuple COMPLETED_SEMAPHORE =
new FetchEmitTuple(null, null, null);
+ private static final Logger LOGGER = LoggerFactory.getLogger(FetchIterator.class);
+
private long maxWaitMs = DEFAULT_MAX_WAIT_MS;
- private int numConsumers = -1;
private ArrayBlockingQueue<FetchEmitTuple> queue = null;
private int queueSize = DEFAULT_QUEUE_SIZE;
private String fetcherName;
private String emitterName;
private int added = 0;
private FetchEmitTuple.ON_PARSE_EXCEPTION onParseException = FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT;
-
- public FetchIterator() {
-
- }
-
- public FetchIterator(String fetcherName) {
- this.fetcherName = fetcherName;
- }
-
- /**
- * This must be called before 'calling' this object.
- * @param numConsumers
- */
- public ArrayBlockingQueue<FetchEmitTuple> init(int numConsumers) {
- this.queue = new ArrayBlockingQueue<>(queueSize);
- this.numConsumers = numConsumers;
- return queue;
- }
+ private FutureTask<Integer> futureTask;
@Field
public void setFetcherName(String fetcherName) {
@@ -117,26 +110,16 @@ public abstract class FetchIterator implements Callable<Integer>, Initializable
return onParseException;
}
- @Override
public Integer call() throws Exception {
- if (queue == null || numConsumers < 0) {
- throw new IllegalStateException("Must call 'init' before calling this object");
- }
- System.out.println("fetch iterator");
enqueue();
- System.out.println("fetch iterator finshed enqueing");
- for (int i = 0; i < numConsumers; i++) {
- tryToAdd(COMPLETED_SEMAPHORE);
- }
+ tryToAdd(COMPLETED_SEMAPHORE);
return added;
}
protected abstract void enqueue() throws IOException, TimeoutException, InterruptedException;
protected void tryToAdd(FetchEmitTuple p) throws InterruptedException, TimeoutException {
- if (p != COMPLETED_SEMAPHORE) {
- added++;
- }
+ added++;
boolean offered = queue.offer(p, maxWaitMs, TimeUnit.MILLISECONDS);
if (!offered) {
throw new TimeoutException("timed out while offering");
@@ -153,4 +136,76 @@ public abstract class FetchIterator implements Callable<Integer>, Initializable
//no-op
}
+ @Override
+ public Iterator<FetchEmitTuple> iterator() {
+ if (futureTask != null) {
+ throw new IllegalStateException("Can't call iterator more than once!");
+ }
+ futureTask = new FutureTask<>(this);
+ queue = new ArrayBlockingQueue<>(queueSize);
+ new Thread(futureTask).start();
+ return new TupleIterator();
+ }
+
+ private class TupleIterator implements Iterator<FetchEmitTuple> {
+ FetchEmitTuple next = null;
+
+ @Override
+ public boolean hasNext() {
+ if (next == null) {
+ next = pollNext();
+ }
+ return next != COMPLETED_SEMAPHORE;
+ }
+
+ @Override
+ public FetchEmitTuple next() {
+ if (next == COMPLETED_SEMAPHORE) {
+ throw new IllegalStateException(
+ "don't call next() after hasNext() has returned false!");
+ }
+ FetchEmitTuple ret = next;
+ next = pollNext();
+ return ret;
+ }
+
+ private FetchEmitTuple pollNext() throws TikaTimeoutException {
+
+ FetchEmitTuple t = null;
+ long start = System.currentTimeMillis();
+ try {
+ long elapsed = System.currentTimeMillis()-start;
+ while (t == null && elapsed < maxWaitMs) {
+ checkThreadOk();
+ t = queue.poll(100, TimeUnit.MILLISECONDS);
+ elapsed = System.currentTimeMillis()-start;
+ }
+ } catch (InterruptedException e) {
+ LOGGER.warn("interrupted");
+ return COMPLETED_SEMAPHORE;
+ }
+ if (t == null) {
+ throw new TikaTimeoutException("waited longer than "+
+ maxWaitMs+"ms for the next tuple");
+ }
+ return t;
+ }
+
+ /**
+ * this checks to make sure that the thread hasn't terminated early.
+ * Will return true if the thread has successfully completed or if
+ * it has not completed. Will return false if there has been a thread
+ * interrupt. Will throw a RuntimeException if there's been
+ * an execution exception in the thread.
+ */
+ private void checkThreadOk() throws InterruptedException {
+ if (futureTask.isDone()) {
+ try {
+ futureTask.get();
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e.getCause());
+ }
+ }
+ }
+ }
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
index 8a06769..4451768 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
@@ -42,8 +42,7 @@ public class FileSystemFetchIterator
public FileSystemFetchIterator() {
}
- public FileSystemFetchIterator(String fetcherName, Path basePath) {
- super(fetcherName);
+ public FileSystemFetchIterator(Path basePath) {
this.basePath = basePath;
}
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java b/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
index c5b5558..0290a59 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
@@ -25,11 +25,6 @@ import java.nio.file.Paths;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -50,23 +45,12 @@ public class FileSystemFetchIteratorTest {
}
String fetcherName = "fs";
- ExecutorService es = Executors.newFixedThreadPool(1);
- ExecutorCompletionService<Integer> cs = new ExecutorCompletionService<>(es);
- FetchIterator it = new FileSystemFetchIterator(fetcherName, root);
- it.setQueueSize(20000);
- ArrayBlockingQueue<FetchEmitTuple> q = it.init(1);
-
- cs.submit(it);
-
-
- Future<Integer> f = cs.take();
- f.get();
+ FetchIterator it = new FileSystemFetchIterator(root);
+ it.setFetcherName(fetcherName);
+ it.setQueueSize(2);
Set<String> iteratorSet = new HashSet<>();
- for (FetchEmitTuple p : q) {
- if (p == FetchIterator.COMPLETED_SEMAPHORE) {
- break;
- }
+ for (FetchEmitTuple p : it) {
iteratorSet.add(p.getFetchKey().getKey());
}
diff --git a/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java b/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
index 4cae5c9..68fc1ee 100644
--- a/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
+++ b/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
@@ -16,6 +16,7 @@
*/
package org.apache.tika.pipes.emitter.s3;
+import com.amazonaws.AmazonClientException;
import com.amazonaws.SdkClientException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
@@ -78,7 +79,7 @@ import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
* <!-- optional; prefix to add to the path before emitting; default is no prefix -->
* <param name="prefix" type="string">my-prefix</param>
* <!-- optional; default is 'json' this will be added to the SOURCE_PATH
- * if no emitter key is specified -->
+ * if no emitter key is specified. Do not add a "." before the extension -->
* <param name="fileExtension" type="string">json</param>
* <!-- optional; default is 'true'-- whether to copy the json to a local file before putting to s3 -->
* <param name="spoolToTemp" type="bool">true</param>
@@ -148,7 +149,7 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
* @param path -- object path, not including the bucket
* @param is inputStream to copy
* @param userMetadata this will be written to the s3 ObjectMetadata's userMetadata
- * @throws TikaEmitterException
+ * @throws TikaEmitterException or IOexception if there is a Runtime s3 client exception
*/
@Override
public void emit(String path, InputStream is, Metadata userMetadata) throws IOException, TikaEmitterException {
@@ -173,22 +174,22 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
}
}
}
- ObjectMetadata objectMetadata = new ObjectMetadata();
- if (length > 0) {
- objectMetadata.setContentLength(length);
- }
- for (String n : userMetadata.names()) {
- String[] vals = userMetadata.getValues(n);
- if (vals.length > 1) {
- LOGGER.warn("Can only write the first value for key {}. I see {} values.",
- n, vals.length);
- }
- objectMetadata.addUserMetadata(n, vals[0]);
- }
try {
+ ObjectMetadata objectMetadata = new ObjectMetadata();
+ if (length > 0) {
+ objectMetadata.setContentLength(length);
+ }
+ for (String n : userMetadata.names()) {
+ String[] vals = userMetadata.getValues(n);
+ if (vals.length > 1) {
+ LOGGER.warn("Can only write the first value for key {}. I see {} values.",
+ n, vals.length);
+ }
+ objectMetadata.addUserMetadata(n, vals[0]);
+ }
s3Client.putObject(bucket, path, is, objectMetadata);
- } catch (SdkClientException e) {
- throw new TikaEmitterException("problem writing s3object", e);
+ } catch (AmazonClientException e) {
+ throw new IOException("problem writing s3object", e);
}
}
@@ -248,6 +249,12 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
+ /**
+ * This initializes the s3 client. Note, we wrap S3's RuntimeExceptions,
+ * e.g. AmazonClientException in a TikaConfigException.
+ * @param params params to use for initialization
+ * @throws TikaConfigException
+ */
@Override
public void initialize(Map<String, Param> params) throws TikaConfigException {
//params have already been set...ignore them
@@ -261,10 +268,14 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
"must be either 'instance' or 'profile'");
}
- s3Client = AmazonS3ClientBuilder.standard()
- .withRegion(region)
- .withCredentials(provider)
- .build();
+ try {
+ s3Client = AmazonS3ClientBuilder.standard()
+ .withRegion(region)
+ .withCredentials(provider)
+ .build();
+ } catch (AmazonClientException e) {
+ throw new TikaConfigException("can't initialize s3 emitter", e);
+ }
}
@Override
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
index 74f3ebe..55ae655 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import static org.apache.tika.pipes.fetchiterator.FetchIterator.COMPLETED_SEMAPHORE;
import static org.junit.Assert.assertEquals;
public class TestCSVFetchIterator {
@@ -46,27 +47,32 @@ public class TestCSVFetchIterator {
it.setCsvPath(p);
it.setFetchKeyColumn("fetchKey");
int numConsumers = 2;
- ExecutorService es = Executors.newFixedThreadPool(numConsumers+1);
- ArrayBlockingQueue<FetchEmitTuple> queue = it.init(numConsumers);
+ ExecutorService es = Executors.newFixedThreadPool(numConsumers);
ExecutorCompletionService c = new ExecutorCompletionService(es);
- c.submit(it);
+ ArrayBlockingQueue<FetchEmitTuple> queue = new ArrayBlockingQueue<>(100);
List<MockFetcher> fetchers = new ArrayList<>();
for (int i = 0; i < numConsumers; i++) {
MockFetcher f = new MockFetcher(queue);
fetchers.add(f);
c.submit(f);
}
+ for (FetchEmitTuple t : it) {
+ queue.offer(t);
+ }
+ for (int i = 0; i < numConsumers; i++) {
+ queue.offer(COMPLETED_SEMAPHORE);
+ }
int finished = 0;
int completed = 0;
try {
- while (finished++ < numConsumers+1) {
+ while (finished++ < numConsumers) {
Future<Integer> f = c.take();
completed += f.get();
}
} finally {
es.shutdownNow();
}
- assertEquals(10, completed);
+ assertEquals(5, completed);
for (MockFetcher f : fetchers) {
for (FetchEmitTuple t : f.pairs) {
String id = t.getMetadata().get("id");
@@ -79,27 +85,16 @@ public class TestCSVFetchIterator {
}
}
- @Test(expected = ExecutionException.class)
+ @Test(expected = RuntimeException.class)
public void testBadFetchKeyCol() throws Exception {
Path p = get("test-simple.csv");
CSVFetchIterator it = new CSVFetchIterator();
it.setFetcherName("fs");
it.setCsvPath(p);
it.setFetchKeyColumn("fetchKeyDoesntExist");
- ExecutorService es = Executors.newFixedThreadPool(2);
- ExecutorCompletionService c = new ExecutorCompletionService(es);
- c.submit(it);
- c.submit(new MockFetcher(it.init(1)));
- int finished = 0;
- try {
- while (finished++ < 2) {
- Future f = c.take();
- f.get();
- }
- } finally {
- es.shutdownNow();
- }
+ for (FetchEmitTuple t : it) {
+ }
}
private Path get(String testFileName) throws Exception {
@@ -117,7 +112,7 @@ public class TestCSVFetchIterator {
public Integer call() throws Exception {
while (true) {
FetchEmitTuple t = queue.poll(1, TimeUnit.HOURS);
- if (t == FetchIterator.COMPLETED_SEMAPHORE) {
+ if (t == COMPLETED_SEMAPHORE) {
return pairs.size();
}
pairs.add(t);
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
index 19da544..112f52a 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
@@ -93,6 +93,10 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
this.select = select;
}
+ public String getSelect() {
+ return select;
+ }
+
@Override
protected void enqueue() throws InterruptedException, IOException, TimeoutException {
String fetcherName = getFetcherName();
@@ -152,14 +156,18 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
for (int i = 1; i <= rs.getMetaData().getColumnCount(); i++) {
if (i == fetchEmitKeyIndices.fetchKeyIndex) {
fetchKey = getString(i, rs);
+ if (fetchKey == null) {
+ LOGGER.debug("fetchKey is empty for record " + toString(rs));
+ }
fetchKey = (fetchKey == null) ? "" : fetchKey;
- LOGGER.debug("fetchKey is empty for record "+toString(rs));
continue;
}
if (i == fetchEmitKeyIndices.emitKeyIndex) {
emitKey = getString(i, rs);
+ if (emitKey == null) {
+ LOGGER.debug("emitKey is empty for record "+toString(rs));
+ }
emitKey = (emitKey == null) ? "" : emitKey;
- LOGGER.warn("emitKey is empty for record "+toString(rs));
continue;
}
String val = getString(i, rs);
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
index ee3fcee..9a92fc5 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
@@ -91,24 +91,29 @@ public class TestJDBCFetchIterator {
int numConsumers = 5;
FetchIterator fetchIterator = tk.getFetchIterator();
ExecutorService es = Executors.newFixedThreadPool(numConsumers+1);
- ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(es);
- ArrayBlockingQueue<FetchEmitTuple> queue = fetchIterator.init(numConsumers);
- completionService.submit(fetchIterator);
+ ExecutorCompletionService<Integer> completionService =
+ new ExecutorCompletionService<>(es);
+ ArrayBlockingQueue<FetchEmitTuple> queue = new ArrayBlockingQueue<>(100);
List<MockFetcher> fetchers = new ArrayList<>();
for (int i = 0; i < numConsumers; i++) {
MockFetcher mockFetcher = new MockFetcher(queue);
fetchers.add(mockFetcher);
completionService.submit(mockFetcher);
}
+ for (FetchEmitTuple t : fetchIterator) {
+ queue.offer(t);
+ }
+ for (int i = 0; i < numConsumers; i++) {
+ queue.offer(FetchIterator.COMPLETED_SEMAPHORE);
+ }
int processed = 0;
int completed = 0;
- while (completed < numConsumers+1) {
+ while (completed < numConsumers) {
Future<Integer> f = completionService.take();
processed += f.get();
completed++;
}
- //fetchiterator added + MockFetcher yields 2x
- assertEquals(NUM_ROWS * 2, processed);
+ assertEquals(NUM_ROWS, processed);
int cnt = 0;
Matcher m = Pattern.compile("fk(\\d+)").matcher("");
for (MockFetcher f : fetchers) {
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
index 15fb849..b8897a8 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
@@ -16,6 +16,7 @@
*/
package org.apache.tika.pipes.fetchiterator.s3;
+import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
@@ -44,7 +45,6 @@ import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
public class S3FetchIterator extends FetchIterator implements Initializable {
-
private static final Logger LOGGER = LoggerFactory.getLogger(S3FetchIterator.class);
private String prefix = "";
private String region;
@@ -82,6 +82,12 @@ public class S3FetchIterator extends FetchIterator implements Initializable {
this.credentialsProvider = credentialsProvider;
}
+ /**
+ * This initializes the s3 client. Note, we wrap S3's RuntimeExceptions,
+ * e.g. AmazonClientException in a TikaConfigException.
+ * @param params params to use for initialization
+ * @throws TikaConfigException
+ */
@Override
public void initialize(Map<String, Param> params) throws TikaConfigException {
//params have already been set...ignore them
@@ -95,10 +101,14 @@ public class S3FetchIterator extends FetchIterator implements Initializable {
"must be either 'instance' or 'profile'");
}
- s3Client = AmazonS3ClientBuilder.standard()
- .withRegion(region)
- .withCredentials(provider)
- .build();
+ try {
+ s3Client = AmazonS3ClientBuilder.standard()
+ .withRegion(region)
+ .withCredentials(provider)
+ .build();
+ } catch (AmazonClientException e) {
+ throw new TikaConfigException("can't initialize s3 fetchiterator");
+ }
}
@Override
@@ -112,6 +122,7 @@ public class S3FetchIterator extends FetchIterator implements Initializable {
@Override
protected void enqueue() throws InterruptedException, IOException, TimeoutException {
String fetcherName = getFetcherName();
+ String emitterName = getEmitterName();
long start = System.currentTimeMillis();
int count = 0;
for (S3ObjectSummary summary : S3Objects.withPrefix(s3Client, bucket, prefix)) {
@@ -121,7 +132,7 @@ public class S3FetchIterator extends FetchIterator implements Initializable {
elapsed);
tryToAdd(new FetchEmitTuple(
new FetchKey(fetcherName, summary.getKey()),
- new EmitKey(fetcherName, summary.getKey()),
+ new EmitKey(emitterName, summary.getKey()),
new Metadata(), getOnParseException()));
count++;
}
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
index 51d20ac..567920d 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
@@ -46,17 +46,22 @@ public class TestS3FetchIterator {
it.setRegion("");//select one
it.initialize(Collections.EMPTY_MAP);
int numConsumers = 6;
- ArrayBlockingQueue<FetchEmitTuple> queue = it.init(numConsumers);
+ ArrayBlockingQueue<FetchEmitTuple> queue = new ArrayBlockingQueue<>(10);
ExecutorService es = Executors.newFixedThreadPool(numConsumers+1);
ExecutorCompletionService c = new ExecutorCompletionService(es);
- c.submit(it);
List<MockFetcher> fetchers = new ArrayList<>();
for (int i = 0; i < numConsumers; i++) {
MockFetcher fetcher = new MockFetcher(queue);
fetchers.add(fetcher);
c.submit(fetcher);
}
+ for (FetchEmitTuple t : it) {
+ queue.offer(t);
+ }
+ for (int i = 0; i < numConsumers; i++) {
+ queue.offer(FetchIterator.COMPLETED_SEMAPHORE);
+ }
int finished = 0;
int completed = 0;
try {
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
index 200e567..506a040 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
+++ b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
@@ -16,6 +16,7 @@
*/
package org.apache.tika.pipes.fetcher.s3;
+import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
@@ -66,24 +67,28 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
LOGGER.debug("about to fetch fetchkey={} from bucket ({})",
fetchKey, bucket);
- S3Object s3Object = s3Client.getObject(new GetObjectRequest(bucket, fetchKey));
- if (extractUserMetadata) {
- for (Map.Entry<String, String> e :
- s3Object.getObjectMetadata().getUserMetadata().entrySet()) {
- metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
+ try {
+ S3Object s3Object = s3Client.getObject(new GetObjectRequest(bucket, fetchKey));
+ if (extractUserMetadata) {
+ for (Map.Entry<String, String> e :
+ s3Object.getObjectMetadata().getUserMetadata().entrySet()) {
+ metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
+ }
}
- }
- if (! spoolToTemp) {
- return TikaInputStream.get(
- s3Object.getObjectContent());
- } else {
- long start = System.currentTimeMillis();
- TikaInputStream tis = TikaInputStream.get(
- s3Object.getObjectContent());
- tis.getPath();
- long elapsed = System.currentTimeMillis()-start;
- LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
- return tis;
+ if (!spoolToTemp) {
+ return TikaInputStream.get(
+ s3Object.getObjectContent());
+ } else {
+ long start = System.currentTimeMillis();
+ TikaInputStream tis = TikaInputStream.get(
+ s3Object.getObjectContent());
+ tis.getPath();
+ long elapsed = System.currentTimeMillis() - start;
+ LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
+ return tis;
+ }
+ } catch (AmazonClientException e) {
+ throw new IOException("s3 client exception", e);
}
}
@@ -93,26 +98,30 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
LOGGER.debug("about to fetch fetchkey={} (start={} end={}) from bucket ({})",
fetchKey, startRange, endRange, bucket);
- S3Object s3Object = s3Client.getObject(new GetObjectRequest(bucket, fetchKey)
- .withRange(startRange, endRange));
+ try {
+ S3Object s3Object = s3Client.getObject(new GetObjectRequest(bucket, fetchKey)
+ .withRange(startRange, endRange));
- if (extractUserMetadata) {
- for (Map.Entry<String, String> e :
- s3Object.getObjectMetadata().getUserMetadata().entrySet()) {
- metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
+ if (extractUserMetadata) {
+ for (Map.Entry<String, String> e :
+ s3Object.getObjectMetadata().getUserMetadata().entrySet()) {
+ metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
+ }
}
- }
- if (! spoolToTemp) {
- return TikaInputStream.get(
- s3Object.getObjectContent());
- } else {
- long start = System.currentTimeMillis();
- TikaInputStream tis = TikaInputStream.get(
- s3Object.getObjectContent());
- tis.getPath();
- long elapsed = System.currentTimeMillis()-start;
- LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
- return tis;
+ if (!spoolToTemp) {
+ return TikaInputStream.get(
+ s3Object.getObjectContent());
+ } else {
+ long start = System.currentTimeMillis();
+ TikaInputStream tis = TikaInputStream.get(
+ s3Object.getObjectContent());
+ tis.getPath();
+ long elapsed = System.currentTimeMillis() - start;
+ LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
+ return tis;
+ }
+ } catch (AmazonClientException e) {
+ throw new IOException(e);
}
}
@@ -154,6 +163,12 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
this.credentialsProvider = credentialsProvider;
}
+ /**
+ * This initializes the s3 client. Note, we wrap S3's RuntimeExceptions,
+ * e.g. AmazonClientException in a TikaConfigException.
+ * @param params params to use for initialization
+ * @throws TikaConfigException
+ */
@Override
public void initialize(Map<String, Param> params) throws TikaConfigException {
//params have already been set...ignore them
@@ -167,10 +182,14 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
"must be either 'instance' or 'profile'");
}
- s3Client = AmazonS3ClientBuilder.standard()
- .withRegion(region)
- .withCredentials(provider)
- .build();
+ try {
+ s3Client = AmazonS3ClientBuilder.standard()
+ .withRegion(region)
+ .withCredentials(provider)
+ .build();
+ } catch (AmazonClientException e) {
+ throw new TikaConfigException("can't initialize s3 fetcher", e);
+ }
}
@Override
diff --git a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
index eea1fc3..2804a7a 100644
--- a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
+++ b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
@@ -11,7 +11,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
diff --git a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
new file mode 100644
index 0000000..a86f563
--- /dev/null
+++ b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
@@ -0,0 +1,54 @@
+package org.apache.tika.pipes.async;
+
+import org.apache.tika.utils.StringUtils;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+public class AsyncConfig {
+
+ public static AsyncConfig load(Path p) throws IOException {
+ AsyncConfig asyncConfig = new AsyncConfig();
+
+ if (StringUtils.isBlank(asyncConfig.getJdbcString())) {
+ asyncConfig.dbDir = Files.createTempDirectory("tika-async-");
+ Path dbFile = asyncConfig.dbDir.resolve("tika-async");
+ asyncConfig.setJdbcString("jdbc:h2:file:" + dbFile.toAbsolutePath().toString() +
+ ";AUTO_SERVER=TRUE");
+ } else {
+ asyncConfig.dbDir = null;
+ }
+ return asyncConfig;
+ }
+
+ private int queueSize = 1000;
+ private int maxConsumers = 10;
+ private String jdbcString;
+ private Path dbDir;
+
+ public int getQueueSize() {
+ return queueSize;
+ }
+
+ public int getMaxConsumers() {
+ return maxConsumers;
+ }
+
+ public String getJdbcString() {
+ return jdbcString;
+ }
+
+ public void setJdbcString(String jdbcString) {
+ this.jdbcString = jdbcString;
+ }
+
+ /**
+ * If no jdbc connection was specified, this
+ * dir contains the h2 database. Otherwise, null.
+ * @return
+ */
+ public Path getTempDBDir() {
+ return dbDir;
+ }
+}
diff --git a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
similarity index 85%
copy from tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
copy to tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index eea1fc3..cfe7455 100644
--- a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
+++ b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -6,10 +6,11 @@ import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
import org.apache.tika.pipes.fetchiterator.EmptyFetchIterator;
import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
import org.apache.tika.pipes.fetchiterator.FetchIterator;
+import org.apache.tika.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
+import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -30,49 +31,64 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-public class AsyncCli {
+public class AsyncProcessor implements Closeable, Callable<Integer> {
- private static final Logger LOG = LoggerFactory.getLogger(AsyncCli.class);
+ private static final Logger LOG = LoggerFactory.getLogger(AsyncProcessor.class);
- public static void main(String[] args) throws Exception {
- Path configPath = Paths.get(args[0]);
- int maxConsumers = 20;
- AsyncCli asyncCli = new AsyncCli();
- Path dbDir = Paths.get("/Users/allison/Desktop/tmp-db");//Files.createTempDirectory("tika-async-db-");
- try {
- asyncCli.execute(dbDir, configPath, maxConsumers);
- } finally {
- FileUtils.deleteDirectory(dbDir.toFile());
+ private final Path tikaConfigPath;
+ private AsyncConfig asyncConfig;
+ private final ArrayBlockingQueue<FetchEmitTuple> queue;//tmp directory used if no jdbc string is configured
+
+ public AsyncProcessor (Path tikaConfigPath) {
+ this.tikaConfigPath = tikaConfigPath;
+ this.queue = new ArrayBlockingQueue<FetchEmitTuple>(asyncConfig.getQueueSize());
+
+ }
+
+ public synchronized boolean offer(List<FetchEmitTuple> fetchEmitTuples, long offerMs) {
+ if (queue == null) {
+ throw new IllegalStateException("queue hasn't been initialized yet.");
+ }
+ long start = System.currentTimeMillis();
+ long elapsed = System.currentTimeMillis()-start;
+ while (elapsed < offerMs) {
+ if (queue.remainingCapacity() > fetchEmitTuples.size()) {
+ queue.addAll(fetchEmitTuples);
+ return true;
+ }
+ elapsed = System.currentTimeMillis() - start;
}
+ return false;
+ }
+ public synchronized boolean offer(FetchEmitTuple t, long offerMs) throws InterruptedException {
+ return queue.offer(t, offerMs, TimeUnit.MILLISECONDS);
}
- private void execute(Path dbDir, Path configPath, int maxConsumers) throws Exception {
- TikaConfig tikaConfig = new TikaConfig(configPath);
- String connectionString = setupTables(dbDir);
+ @Override
+ public Integer call() throws Exception {
+ this.asyncConfig = AsyncConfig.load(tikaConfigPath);
- ExecutorService executorService = Executors.newFixedThreadPool(maxConsumers + 3);
+ setupTables();
+
+ ExecutorService executorService = Executors.newFixedThreadPool(
+ asyncConfig.getMaxConsumers() + 2);
ExecutorCompletionService<Integer> executorCompletionService =
new ExecutorCompletionService<>(executorService);
- try (Connection connection = DriverManager.getConnection(connectionString)) {
- FetchIterator fetchIterator = tikaConfig.getFetchIterator();
- if (fetchIterator instanceof EmptyFetchIterator) {
- throw new IllegalArgumentException("can't have empty fetch iterator");
- }
- ArrayBlockingQueue<FetchEmitTuple> q = fetchIterator.init(maxConsumers);
- AsyncTaskEnqueuer enqueuer = new AsyncTaskEnqueuer(q, connection);
- executorCompletionService.submit(fetchIterator);
+ try (Connection connection = DriverManager.getConnection(asyncConfig.getJdbcString())) {
+ AsyncTaskEnqueuer enqueuer = new AsyncTaskEnqueuer(queue, connection);
+
executorCompletionService.submit(enqueuer);
executorCompletionService.submit(new AssignmentManager(connection, enqueuer));
- for (int i = 0; i < maxConsumers; i++) {
+ for (int i = 0; i < asyncConfig.getMaxConsumers(); i++) {
executorCompletionService.submit(new AsyncWorker(connection,
- connectionString, i, configPath));
+ asyncConfig.getJdbcString(), i, tikaConfigPath));
}
int completed = 0;
- while (completed < maxConsumers+3) {
+ while (completed < asyncConfig.getMaxConsumers()+2) {
Future<Integer> future = executorCompletionService.take();
if (future != null) {
int val = future.get();
@@ -83,13 +99,11 @@ public class AsyncCli {
} finally {
executorService.shutdownNow();
}
+ return 1;
}
- private String setupTables(Path dbDir) throws SQLException {
- Path dbFile = dbDir.resolve("tika-async");
- String url = "jdbc:h2:file:" + dbFile.toAbsolutePath().toString() +
- ";AUTO_SERVER=TRUE";
- Connection connection = DriverManager.getConnection(url);
+ private void setupTables() throws SQLException {
+ Connection connection = DriverManager.getConnection(asyncConfig.getJdbcString());
String sql = "create table parse_queue " +
"(id bigint auto_increment primary key," +
@@ -114,7 +128,16 @@ public class AsyncCli {
"error_code tinyint)";
connection.createStatement().execute(sql);
- return url;
+ }
+
+ @Override
+ public void close() throws IOException {
+ //close down processes and db
+
+
+ if (asyncConfig.getTempDBDir() != null) {
+ FileUtils.deleteDirectory(asyncConfig.getTempDBDir().toFile());
+ }
}
diff --git a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/driver/AsyncCliTest.java b/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/driver/AsyncCliTest.java
index efb5d3b..a84c13a 100644
--- a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/driver/AsyncCliTest.java
+++ b/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/driver/AsyncCliTest.java
@@ -11,4 +11,39 @@ public class AsyncCliTest {
};
AsyncCli.main(args);
}
+
+ @Test
+ public void testUnhandled() throws InterruptedException {
+ Thread t = new Thread(new Task());
+
+ t.start();
+ t.join();
+ for (StackTraceElement el : t.getStackTrace()) {
+ System.out.println(el);
+ }
+ }
+
+ private static class Task implements Runnable {
+
+ @Override
+ public void run() {
+ Thread.currentThread().setUncaughtExceptionHandler(new MyUncaught());
+ for (int i = 0; i < 5; i++) {
+ System.out.println(i);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException();
+ }
+ }
+ throw new RuntimeException("kaboom");
+ }
+ }
+
+ private static class MyUncaught implements Thread.UncaughtExceptionHandler {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ throw new RuntimeException("bad");
+ }
+ }
}