You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/03/26 21:49:07 UTC
[tika] 01/02: TIKA-3304 -- basically works...lots more remains
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch TIKA-3304
in repository https://gitbox.apache.org/repos/asf/tika.git
commit c3ca99a8d1032c3011ed8d2fc83dc6f0d3d5c0d9
Author: tballison <ta...@apache.org>
AuthorDate: Fri Mar 26 17:35:30 2021 -0400
TIKA-3304 -- basically works...lots more remains
---
pom.xml | 2 +-
.../org/apache/tika/pipes/fetcher/FetchKey.java | 2 +-
.../fetchiterator/FileSystemFetchIteratorTest.java | 2 +-
tika-pipes/pom.xml | 37 ++-
tika-pipes/tika-emitters/tika-emitter-fs/pom.xml | 13 +-
.../tika/pipes/emitter/fs/FileSystemEmitter.java | 44 ++-
tika-pipes/tika-emitters/tika-emitter-s3/pom.xml | 13 +-
.../apache/tika/pipes/emitter/s3/S3Emitter.java | 103 +++---
tika-pipes/tika-emitters/tika-emitter-solr/pom.xml | 13 +-
.../tika/pipes/emitter/solr/SolrEmitter.java | 129 ++++----
.../apache/tika/pipes/emitter/solr/TestBasic.java | 34 +-
.../src/test/resources/log4j.properties | 6 +-
.../pipes/fetchiterator/csv/CSVFetchIterator.java | 82 ++---
.../src/test/java/TestCSVFetchIterator.java | 23 +-
.../tika-fetch-iterator-jdbc/pom.xml | 17 +-
.../fetchiterator/jdbc/JDBCFetchIterator.java | 82 ++---
.../fetchiterator/jdbc/TestJDBCFetchIterator.java | 66 ++--
.../src/test/resources/log4j.properties | 4 +-
.../tika-fetch-iterator-s3/pom.xml | 13 +-
.../pipes/fetchiterator/s3/S3FetchIterator.java | 39 ++-
.../fetchiterator/s3/TestS3FetchIterator.java | 17 +-
.../src/test/resources/log4j.properties | 4 +-
tika-pipes/tika-fetchers/tika-fetcher-http/pom.xml | 13 +-
.../tika/pipes/fetcher/http/HttpFetcher.java | 33 +-
.../tika/pipes/fetcher/http/HttpFetcherTest.java | 50 +--
.../apache/tika/pipes/fetcher/s3/S3Fetcher.java | 70 ++--
.../tika/pipes/fetcher/s3/TestS3Fetcher.java | 23 +-
.../org/apache/tika/client/HttpClientFactory.java | 147 ++++-----
.../org/apache/tika/client/HttpClientUtil.java | 26 +-
tika-pipes/tika-pipes-async/pom.xml | 4 +-
.../java/org/apache/tika/pipes/async/AsyncCli.java | 128 ++++----
.../org/apache/tika/pipes/async/AsyncConfig.java | 44 ++-
.../org/apache/tika/pipes/async/AsyncData.java | 53 ++-
.../org/apache/tika/pipes/async/AsyncEmitHook.java | 16 +
.../org/apache/tika/pipes/async/AsyncEmitter.java | 130 ++------
.../tika/pipes/async/AsyncEmitterProcess.java | 309 +++++++++++------
.../tika/pipes/async/AsyncPipesEmitHook.java | 28 +-
.../apache/tika/pipes/async/AsyncProcessor.java | 364 ++++++++++++++-------
.../org/apache/tika/pipes/async/AsyncTask.java | 40 ++-
.../org/apache/tika/pipes/async/AsyncWorker.java | 157 +++++----
.../tika/pipes/async/AsyncWorkerProcess.java | 298 ++++++++---------
.../src/main/resources/log4j.properties | 4 +-
.../org/apache/tika/pipes/async/AsyncCliTest.java | 48 ---
.../tika/pipes/async/AsyncProcessorTest.java | 89 +++--
.../org/apache/tika/pipes/async/MockEmitter.java | 59 +++-
.../org/apache/tika/pipes/async/MockFetcher.java | 33 +-
.../apache/tika/pipes/async/SerializationTest.java | 49 +++
.../apache/tika/pipes/async/TestPipesDriver.java | 109 ------
.../apache/tika/pipes/PipeIntegrationTests.java | 81 +++--
.../src/test/resources/log4j.properties | 4 +-
.../metadata/serialization/JsonFetchEmitTuple.java | 2 +-
.../tika/metadata/serialization/JsonMetadata.java | 14 +-
.../serialization/JsonMetadataDeserializer.java | 43 +++
.../serialization/JsonMetadataSerializer.java | 41 +++
.../tika/server/core/resource/AsyncParser.java | 10 +-
.../tika/server/core/resource/AsyncResource.java | 2 +-
.../tika/server/core/resource/EmitterResource.java | 8 +-
57 files changed, 1794 insertions(+), 1480 deletions(-)
diff --git a/pom.xml b/pom.xml
index 17bc12d..1ad3c85 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,11 +37,11 @@
<modules>
<module>tika-parent</module>
<module>tika-core</module>
+ <module>tika-serialization</module>
<module>tika-pipes</module>
<module>tika-parsers</module>
<module>tika-bundles</module>
<module>tika-xmp</module>
- <module>tika-serialization</module>
<module>tika-batch</module>
<module>tika-langdetect</module>
<module>tika-app</module>
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
index deb4232..2c0ea64 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
@@ -38,7 +38,7 @@ public class FetchKey {
return fetcherName;
}
- public String getKey() {
+ public String getFetchKey() {
return fetchKey;
}
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java b/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
index 7d99828..4e314bc 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
@@ -61,7 +61,7 @@ public class FileSystemFetchIteratorTest {
Set<String> iteratorSet = new HashSet<>();
for (FetchEmitTuple p : it) {
- iteratorSet.add(p.getFetchKey().getKey());
+ iteratorSet.add(p.getFetchKey().getFetchKey());
}
assertEquals(truthSet, iteratorSet);
diff --git a/tika-pipes/pom.xml b/tika-pipes/pom.xml
index 560ed2d..21a5a49 100644
--- a/tika-pipes/pom.xml
+++ b/tika-pipes/pom.xml
@@ -14,7 +14,8 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
@@ -39,4 +40,38 @@
<module>tika-pipes-integration-tests</module>
</modules>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>${checkstyle.plugin.version}</version>
+ <dependencies>
+ <dependency>
+ <groupId>com.puppycrawl.tools</groupId>
+ <artifactId>checkstyle</artifactId>
+ <version>8.41</version>
+ </dependency>
+ </dependencies>
+ <executions>
+ <execution>
+ <id>validate</id>
+ <phase>validate</phase>
+ <configuration>
+ <configLocation>checkstyle.xml</configLocation>
+ <encoding>UTF-8</encoding>
+ <consoleOutput>false</consoleOutput>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ <testSourceDirectories>${project.basedir}/src/test/java</testSourceDirectories>
+ <violationSeverity>error</violationSeverity>
+ <failOnViolation>true</failOnViolation>
+ </configuration>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/tika-pipes/tika-emitters/tika-emitter-fs/pom.xml b/tika-pipes/tika-emitters/tika-emitter-fs/pom.xml
index cf214da..df61b96 100644
--- a/tika-pipes/tika-emitters/tika-emitter-fs/pom.xml
+++ b/tika-pipes/tika-emitters/tika-emitter-fs/pom.xml
@@ -17,8 +17,8 @@
specific language governing permissions and limitations
under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>tika-emitters</artifactId>
@@ -90,15 +90,18 @@
</filter>
</filters>
<transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>META-INF/LICENSE</resource>
<file>target/classes/META-INF/LICENSE</file>
</transformer>
- <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>META-INF/NOTICE</resource>
<file>target/classes/META-INF/NOTICE</file>
</transformer>
- <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>META-INF/DEPENDENCIES</resource>
<file>target/classes/META-INF/DEPENDENCIES</file>
</transformer>
diff --git a/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java b/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
index d4a0448..285142b 100644
--- a/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
+++ b/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
@@ -16,16 +16,6 @@
*/
package org.apache.tika.pipes.emitter.fs;
-import org.apache.tika.config.Field;
-import org.apache.tika.pipes.emitter.AbstractEmitter;
-import org.apache.tika.pipes.emitter.Emitter;
-import org.apache.tika.pipes.emitter.StreamEmitter;
-import org.apache.tika.pipes.emitter.TikaEmitterException;
-import org.apache.tika.exception.TikaException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.metadata.TikaCoreProperties;
-import org.apache.tika.metadata.serialization.JsonMetadataList;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.Writer;
@@ -35,9 +25,15 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
-import java.util.Collections;
import java.util.List;
-import java.util.Set;
+
+import org.apache.tika.config.Field;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.metadata.serialization.JsonMetadataList;
+import org.apache.tika.pipes.emitter.AbstractEmitter;
+import org.apache.tika.pipes.emitter.StreamEmitter;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
/**
* Emitter to write to a file system.
@@ -56,7 +52,8 @@ import java.util.Set;
* <param name="basePath" type="string">/path/to/output</param>
* <!-- optional; default is 'json' -->
* <param name="fileExtension" type="string">json</param>
- * <!-- optional; if the file already exists, options ('skip', 'replace', 'exception')
+ * <!-- optional; if the file already exists,
+ * options ('skip', 'replace', 'exception')
* default is 'exception' -->
* <param name="onExists" type="string">skip</param>
* </params>
@@ -66,17 +63,13 @@ import java.util.Set;
*/
public class FileSystemEmitter extends AbstractEmitter implements StreamEmitter {
- enum ON_EXISTS {
- SKIP, EXCEPTION, REPLACE
- }
-
private Path basePath = null;
private String fileExtension = "json";
private ON_EXISTS onExists = ON_EXISTS.EXCEPTION;
-
@Override
- public void emit(String emitKey, List<Metadata> metadataList) throws IOException, TikaEmitterException {
+ public void emit(String emitKey, List<Metadata> metadataList)
+ throws IOException, TikaEmitterException {
Path output;
if (metadataList == null || metadataList.size() == 0) {
throw new TikaEmitterException("metadata list must not be null or of size 0");
@@ -124,15 +117,14 @@ public class FileSystemEmitter extends AbstractEmitter implements StreamEmitter
} else if (onExists.equals("exception")) {
this.onExists = ON_EXISTS.EXCEPTION;
} else {
- throw new IllegalArgumentException(
- "Don't understand '" + onExists +
- "'; must be one of: 'skip', 'replace', 'exception'");
+ throw new IllegalArgumentException("Don't understand '" + onExists +
+ "'; must be one of: 'skip', 'replace', 'exception'");
}
}
@Override
- public void emit(String path, InputStream inputStream, Metadata userMetadata) throws IOException,
- TikaEmitterException {
+ public void emit(String path, InputStream inputStream, Metadata userMetadata)
+ throws IOException, TikaEmitterException {
Path target = basePath.resolve(path);
if (!Files.isDirectory(target.getParent())) {
@@ -152,4 +144,8 @@ public class FileSystemEmitter extends AbstractEmitter implements StreamEmitter
}
}
}
+
+ enum ON_EXISTS {
+ SKIP, EXCEPTION, REPLACE
+ }
}
diff --git a/tika-pipes/tika-emitters/tika-emitter-s3/pom.xml b/tika-pipes/tika-emitters/tika-emitter-s3/pom.xml
index 7b7def3..773865c 100644
--- a/tika-pipes/tika-emitters/tika-emitter-s3/pom.xml
+++ b/tika-pipes/tika-emitters/tika-emitter-s3/pom.xml
@@ -17,8 +17,8 @@
specific language governing permissions and limitations
under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>tika-emitters</artifactId>
@@ -137,15 +137,18 @@
</filter>
</filters>
<transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>META-INF/LICENSE</resource>
<file>target/classes/META-INF/LICENSE</file>
</transformer>
- <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>META-INF/NOTICE</resource>
<file>target/classes/META-INF/NOTICE</file>
</transformer>
- <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>META-INF/DEPENDENCIES</resource>
<file>target/classes/META-INF/DEPENDENCIES</file>
</transformer>
diff --git a/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java b/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
index 68fc1ee..6f9c745 100644
--- a/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
+++ b/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
@@ -16,15 +16,31 @@
*/
package org.apache.tika.pipes.emitter.s3;
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.List;
+import java.util.Map;
+
import com.amazonaws.AmazonClientException;
-import com.amazonaws.SdkClientException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.ObjectMetadata;
-import org.apache.http.client.CredentialsProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.tika.config.Field;
import org.apache.tika.config.Initializable;
import org.apache.tika.config.InitializableProblemHandler;
@@ -33,31 +49,13 @@ import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.exception.TikaException;
import org.apache.tika.io.TemporaryResources;
import org.apache.tika.io.TikaInputStream;
+import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.metadata.serialization.JsonMetadataList;
import org.apache.tika.pipes.emitter.AbstractEmitter;
import org.apache.tika.pipes.emitter.StreamEmitter;
import org.apache.tika.pipes.emitter.TikaEmitterException;
-import org.apache.tika.metadata.Metadata;
import org.apache.tika.utils.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedWriter;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
/**
* Emits to existing s3 bucket
@@ -71,17 +69,21 @@ import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
* <!-- required -->
* <param name="region" type="string">us-east-1</param>
* <!-- required -->
- * <param name="credentialsProvider" type="string">(profile|instance)</param>
+ * <param name="credentialsProvider"
+ * type="string">(profile|instance)</param>
* <!-- required if credentialsProvider=profile-->
* <param name="profile" type="string">my-profile</param>
* <!-- required -->
* <param name="bucket" type="string">my-bucket</param>
- * <!-- optional; prefix to add to the path before emitting; default is no prefix -->
+ * <!-- optional; prefix to add to the path before emitting;
+ * default is no prefix -->
* <param name="prefix" type="string">my-prefix</param>
* <!-- optional; default is 'json' this will be added to the SOURCE_PATH
- * if no emitter key is specified. Do not add a "." before the extension -->
+ * if no emitter key is specified. Do not add a "."
+ * before the extension -->
* <param name="fileExtension" type="string">json</param>
- * <!-- optional; default is 'true'-- whether to copy the json to a local file before putting to s3 -->
+ * <!-- optional; default is 'true'-- whether to copy the
+ * json to a local file before putting to s3 -->
* <param name="spoolToTemp" type="bool">true</param>
* </params>
* </emitter>
@@ -108,15 +110,16 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
* @throws TikaException
*/
@Override
- public void emit(String emitKey, List<Metadata> metadataList) throws IOException, TikaEmitterException {
+ public void emit(String emitKey, List<Metadata> metadataList)
+ throws IOException, TikaEmitterException {
if (metadataList == null || metadataList.size() == 0) {
throw new TikaEmitterException("metadata list must not be null or of size 0");
}
- if (! spoolToTemp) {
+ if (!spoolToTemp) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
- try (Writer writer =
- new BufferedWriter(new OutputStreamWriter(bos, StandardCharsets.UTF_8))) {
+ try (Writer writer = new BufferedWriter(
+ new OutputStreamWriter(bos, StandardCharsets.UTF_8))) {
JsonMetadataList.toJson(metadataList, writer);
} catch (IOException e) {
throw new TikaEmitterException("can't jsonify", e);
@@ -129,8 +132,8 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
TemporaryResources tmp = new TemporaryResources();
try {
Path tmpPath = tmp.createTempFile();
- try (Writer writer = Files.newBufferedWriter(tmpPath,
- StandardCharsets.UTF_8, StandardOpenOption.CREATE)) {
+ try (Writer writer = Files.newBufferedWriter(tmpPath, StandardCharsets.UTF_8,
+ StandardOpenOption.CREATE)) {
JsonMetadataList.toJson(metadataList, writer);
} catch (IOException e) {
throw new TikaEmitterException("can't jsonify", e);
@@ -145,28 +148,27 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
}
/**
- *
- * @param path -- object path, not including the bucket
- * @param is inputStream to copy
+ * @param path -- object path, not including the bucket
+ * @param is inputStream to copy
* @param userMetadata this will be written to the s3 ObjectMetadata's userMetadata
* @throws TikaEmitterException or IOexception if there is a Runtime s3 client exception
*/
@Override
- public void emit(String path, InputStream is, Metadata userMetadata) throws IOException, TikaEmitterException {
+ public void emit(String path, InputStream is, Metadata userMetadata)
+ throws IOException, TikaEmitterException {
if (!StringUtils.isBlank(prefix)) {
path = prefix + "/" + path;
}
- if (! StringUtils.isBlank(fileExtension)) {
+ if (!StringUtils.isBlank(fileExtension)) {
path += "." + fileExtension;
}
- LOGGER.debug("about to emit to target bucket: ({}) path:({})",
- bucket, path);
+ LOGGER.debug("about to emit to target bucket: ({}) path:({})", bucket, path);
long length = -1;
if (is instanceof TikaInputStream) {
- if (((TikaInputStream)is).hasFile()) {
+ if (((TikaInputStream) is).hasFile()) {
try {
length = ((TikaInputStream) is).getLength();
} catch (IOException e) {
@@ -183,7 +185,8 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
String[] vals = userMetadata.getValues(n);
if (vals.length > 1) {
LOGGER.warn("Can only write the first value for key {}. I see {} values.",
- n, vals.length);
+ n,
+ vals.length);
}
objectMetadata.addUserMetadata(n, vals[0]);
}
@@ -197,6 +200,7 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
* Whether or not to spool the metadatalist to a tmp file before putting object.
* Default: <code>true</code>. If this is set to <code>false</code>,
* this emitter writes the json object to memory and then puts that into s3.
+ *
* @param spoolToTemp
*/
@Field
@@ -223,7 +227,7 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
public void setPrefix(String prefix) {
//strip final "/" if it exists
if (prefix.endsWith("/")) {
- this.prefix = prefix.substring(0, prefix.length()-1);
+ this.prefix = prefix.substring(0, prefix.length() - 1);
} else {
this.prefix = prefix;
}
@@ -231,8 +235,9 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
@Field
public void setCredentialsProvider(String credentialsProvider) {
- if (! credentialsProvider.equals("profile") && ! credentialsProvider.equals("instance")) {
- throw new IllegalArgumentException("credentialsProvider must be either 'profile' or instance'");
+ if (!credentialsProvider.equals("profile") && !credentialsProvider.equals("instance")) {
+ throw new IllegalArgumentException(
+ "credentialsProvider must be either 'profile' or instance'");
}
this.credentialsProvider = credentialsProvider;
}
@@ -240,6 +245,7 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
/**
* If you want to customize the output file's file extension.
* Do not include the "."
+ *
* @param fileExtension
*/
@Field
@@ -248,10 +254,10 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
}
-
/**
* This initializes the s3 client. Note, we wrap S3's RuntimeExceptions,
* e.g. AmazonClientException in a TikaConfigException.
+ *
* @param params params to use for initialization
* @throws TikaConfigException
*/
@@ -261,7 +267,7 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
AWSCredentialsProvider provider = null;
if ("instance".equals(credentialsProvider)) {
provider = InstanceProfileCredentialsProvider.getInstance();
- } else if ("profile".equals(credentialsProvider)){
+ } else if ("profile".equals(credentialsProvider)) {
provider = new ProfileCredentialsProvider(profile);
} else {
throw new TikaConfigException("credentialsProvider must be set and " +
@@ -269,9 +275,7 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
}
try {
- s3Client = AmazonS3ClientBuilder.standard()
- .withRegion(region)
- .withCredentials(provider)
+ s3Client = AmazonS3ClientBuilder.standard().withRegion(region).withCredentials(provider)
.build();
} catch (AmazonClientException e) {
throw new TikaConfigException("can't initialize s3 emitter", e);
@@ -279,7 +283,8 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
}
@Override
- public void checkInitialization(InitializableProblemHandler problemHandler) throws TikaConfigException {
+ public void checkInitialization(InitializableProblemHandler problemHandler)
+ throws TikaConfigException {
mustNotBeEmpty("bucket", this.bucket);
mustNotBeEmpty("region", this.region);
}
diff --git a/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml b/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml
index e36d1c8..a831fa9 100644
--- a/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml
+++ b/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml
@@ -17,8 +17,8 @@
specific language governing permissions and limitations
under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>tika-emitters</artifactId>
@@ -102,15 +102,18 @@
</filter>
</filters>
<transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>META-INF/LICENSE</resource>
<file>target/classes/META-INF/LICENSE</file>
</transformer>
- <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>META-INF/NOTICE</resource>
<file>target/classes/META-INF/NOTICE</file>
</transformer>
- <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>META-INF/DEPENDENCIES</resource>
<file>target/classes/META-INF/DEPENDENCIES</file>
</transformer>
diff --git a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
index 7fb7f1b..f43963e 100644
--- a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
+++ b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
@@ -16,9 +16,23 @@
*/
package org.apache.tika.pipes.emitter.solr;
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.zip.GZIPOutputStream;
+
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.http.client.HttpClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.tika.client.HttpClientFactory;
import org.apache.tika.client.HttpClientUtil;
import org.apache.tika.client.TikaClientException;
@@ -26,39 +40,19 @@ import org.apache.tika.config.Field;
import org.apache.tika.config.Initializable;
import org.apache.tika.config.InitializableProblemHandler;
import org.apache.tika.config.Param;
-import org.apache.tika.pipes.emitter.AbstractEmitter;
-import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.AbstractEmitter;
+import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.pipes.emitter.TikaEmitterException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedWriter;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.zip.GZIPOutputStream;
public class SolrEmitter extends AbstractEmitter implements Initializable {
- enum AttachmentStrategy {
- SKIP,
- CONCATENATE_CONTENT,
- PARENT_CHILD,
- //anything else?
- }
private static final String ATTACHMENTS = "attachments";
private static final String UPDATE_PATH = "/update";
private static final Logger LOG = LoggerFactory.getLogger(SolrEmitter.class);
//one day this will be allowed or can be configured?
private final boolean gzipJson = false;
-
private AttachmentStrategy attachmentStrategy = AttachmentStrategy.PARENT_CHILD;
private String url;
private String contentField = "content";
@@ -66,55 +60,48 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
private int commitWithin = 100;
private HttpClientFactory httpClientFactory;
private HttpClient httpClient;
-
public SolrEmitter() throws TikaConfigException {
httpClientFactory = new HttpClientFactory();
}
+
@Override
- public void emit(String emitKey, List<Metadata> metadataList) throws IOException,
- TikaEmitterException {
+ public void emit(String emitKey, List<Metadata> metadataList)
+ throws IOException, TikaEmitterException {
if (metadataList == null || metadataList.size() == 0) {
LOG.warn("metadataList is null or empty");
return;
}
ByteArrayOutputStream bos = new ByteArrayOutputStream();
- Writer writer = gzipJson ?
- new BufferedWriter(
- new OutputStreamWriter(
- new GZIPOutputStream(bos), StandardCharsets.UTF_8)) :
- new BufferedWriter(
- new OutputStreamWriter(bos, StandardCharsets.UTF_8));
- try (
- JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer)) {
+ Writer writer = gzipJson ? new BufferedWriter(
+ new OutputStreamWriter(new GZIPOutputStream(bos), StandardCharsets.UTF_8)) :
+ new BufferedWriter(new OutputStreamWriter(bos, StandardCharsets.UTF_8));
+ try (JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer)) {
jsonGenerator.writeStartArray();
jsonify(jsonGenerator, emitKey, metadataList);
jsonGenerator.writeEndArray();
}
- LOG.debug("emitting json ({})",
- new String(bos.toByteArray(), StandardCharsets.UTF_8));
+ LOG.debug("emitting json ({})", new String(bos.toByteArray(), StandardCharsets.UTF_8));
try {
- HttpClientUtil.postJson(httpClient,
- url+UPDATE_PATH+"?commitWithin="+getCommitWithin(), bos.toByteArray(), gzipJson);
+ HttpClientUtil
+ .postJson(httpClient, url + UPDATE_PATH +
+ "?commitWithin=" + getCommitWithin(),
+ bos.toByteArray(), gzipJson);
} catch (TikaClientException e) {
throw new TikaEmitterException("can't post", e);
}
}
@Override
- public void emit(List<? extends EmitData> batch) throws IOException,
- TikaEmitterException {
+ public void emit(List<? extends EmitData> batch) throws IOException, TikaEmitterException {
if (batch == null || batch.size() == 0) {
LOG.warn("batch is null or empty");
return;
}
ByteArrayOutputStream bos = new ByteArrayOutputStream();
- Writer writer = gzipJson ?
- new BufferedWriter(
- new OutputStreamWriter(
- new GZIPOutputStream(bos), StandardCharsets.UTF_8)) :
- new BufferedWriter(
- new OutputStreamWriter(bos, StandardCharsets.UTF_8));
+ Writer writer = gzipJson ? new BufferedWriter(
+ new OutputStreamWriter(new GZIPOutputStream(bos), StandardCharsets.UTF_8)) :
+ new BufferedWriter(new OutputStreamWriter(bos, StandardCharsets.UTF_8));
try (JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer)) {
jsonGenerator.writeStartArray();
for (EmitData d : batch) {
@@ -122,21 +109,22 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
}
jsonGenerator.writeEndArray();
}
- LOG.debug("emitting json ({})",
- new String(bos.toByteArray(), StandardCharsets.UTF_8));
+ LOG.debug("emitting json ({})", new String(bos.toByteArray(), StandardCharsets.UTF_8));
try {
- HttpClientUtil.postJson(httpClient,
- url+UPDATE_PATH+"?commitWithin="+getCommitWithin(),
- bos.toByteArray(), gzipJson);
+ HttpClientUtil
+ .postJson(httpClient, url + UPDATE_PATH +
+ "?commitWithin=" + getCommitWithin(),
+ bos.toByteArray(), gzipJson);
} catch (TikaClientException e) {
throw new TikaEmitterException("can't post", e);
}
}
- private void jsonify(JsonGenerator jsonGenerator, String emitKey, List<Metadata> metadataList) throws IOException {
+ private void jsonify(JsonGenerator jsonGenerator, String emitKey,
+ List<Metadata> metadataList)
+ throws IOException {
metadataList.get(0).set(idField, emitKey);
- if (attachmentStrategy == AttachmentStrategy.SKIP ||
- metadataList.size() == 1) {
+ if (attachmentStrategy == AttachmentStrategy.SKIP || metadataList.size() == 1) {
jsonify(metadataList.get(0), jsonGenerator);
} else if (attachmentStrategy == AttachmentStrategy.CONCATENATE_CONTENT) {
//this only handles text for now, not xhtml
@@ -162,12 +150,13 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
jsonGenerator.writeEndArray();
jsonGenerator.writeEndObject();
} else {
- throw new IllegalArgumentException("I don't yet support this attachment strategy: "
- + attachmentStrategy);
+ throw new IllegalArgumentException(
+ "I don't yet support this attachment strategy: " + attachmentStrategy);
}
}
- private void jsonify(Metadata metadata, JsonGenerator jsonGenerator, boolean writeEndObject) throws IOException {
+ private void jsonify(Metadata metadata, JsonGenerator jsonGenerator, boolean writeEndObject)
+ throws IOException {
jsonGenerator.writeStartObject();
for (String n : metadata.names()) {
String[] vals = metadata.getValues(n);
@@ -213,29 +202,35 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
} else if (attachmentStrategy.equals("parent-child")) {
this.attachmentStrategy = AttachmentStrategy.PARENT_CHILD;
} else {
- throw new IllegalArgumentException("Expected 'skip', 'concatenate-content' or "+
+ throw new IllegalArgumentException("Expected 'skip', 'concatenate-content' or " +
"'parent-child'. I regret I do not recognize: " + attachmentStrategy);
}
}
/**
* Specify the url for Solr
+ *
* @param url
*/
@Field
public void setUrl(String url) {
if (url.endsWith("/")) {
- url = url.substring(0, url.length()-1);
+ url = url.substring(0, url.length() - 1);
}
this.url = url;
}
+ public String getContentField() {
+ return contentField;
+ }
+
/**
* This is the field _after_ metadata mappings have been applied
* that contains the "content" for each metadata object.
- *
+ * <p>
* This is the field that is used if {@link #attachmentStrategy}
* is {@link AttachmentStrategy#CONCATENATE_CONTENT}.
+ *
* @param contentField
*/
@Field
@@ -243,8 +238,8 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
this.contentField = contentField;
}
- public String getContentField() {
- return contentField;
+ public int getCommitWithin() {
+ return commitWithin;
}
@Field
@@ -252,10 +247,6 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
this.commitWithin = commitWithin;
}
- public int getCommitWithin() {
- return commitWithin;
- }
-
/**
* Specify the field in the first Metadata that should be
* used as the id field for the document.
@@ -300,8 +291,14 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
}
@Override
- public void checkInitialization(InitializableProblemHandler problemHandler) throws TikaConfigException {
+ public void checkInitialization(InitializableProblemHandler problemHandler)
+ throws TikaConfigException {
}
+ enum AttachmentStrategy {
+ SKIP, CONCATENATE_CONTENT, PARENT_CHILD,
+ //anything else?
+ }
+
}
diff --git a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
index 069e7a2..049fe96 100644
--- a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
+++ b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
@@ -17,19 +17,20 @@
package org.apache.tika.pipes.emitter.solr;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
import org.apache.tika.config.TikaConfig;
import org.apache.tika.exception.TikaException;
-import org.apache.tika.pipes.emitter.EmitData;
-import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.pipes.emitter.Emitter;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.metadata.filter.MetadataFilter;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.emitter.Emitter;
@Ignore("requires solr to be up and running")
public class TestBasic {
@@ -39,8 +40,7 @@ public class TestBasic {
TikaConfig tikaConfig = new TikaConfig(
TestBasic.class.getResourceAsStream("/tika-config-simple-emitter.xml"));
Emitter emitter = tikaConfig.getEmitterManager().getEmitter("solr1");
- List<Metadata> metadataList = getParentChild(tikaConfig,
- "id1", 2);
+ List<Metadata> metadataList = getParentChild(tikaConfig, "id1", 2);
emitter.emit("1", metadataList);
}
@@ -52,17 +52,15 @@ public class TestBasic {
Emitter emitter = tikaConfig.getEmitterManager().getEmitter("solr2");
List<EmitData> emitData = new ArrayList<>();
for (int i = 0; i < 100; i++) {
- List<Metadata> metadataList = getParentChild(tikaConfig,
- "batch_"+i, 4);
- emitData.add(new EmitData(
- new EmitKey(emitter.getName(), "batch_"+i),
- metadataList));
+ List<Metadata> metadataList = getParentChild(tikaConfig, "batch_" + i, 4);
+ emitData.add(new EmitData(new EmitKey(emitter.getName(),
+ "batch_" + i), metadataList));
}
emitter.emit(emitData);
}
- private List<Metadata> getParentChild(TikaConfig tikaConfig,
- String id, int numChildren) throws TikaException {
+ private List<Metadata> getParentChild(TikaConfig tikaConfig, String id, int numChildren)
+ throws TikaException {
List<Metadata> metadataList = new ArrayList<>();
MetadataFilter filter = tikaConfig.getMetadataFilter();
@@ -75,7 +73,7 @@ public class TestBasic {
m1.add(TikaCoreProperties.CREATOR, "secondAuthor");
filter.filter(m1);
metadataList.add(m1);
- for (int i = 1; i < numChildren; i++ ) {
+ for (int i = 1; i < numChildren; i++) {
Metadata m2 = new Metadata();
m2.set(TikaCoreProperties.EMBEDDED_RESOURCE_PATH, "/path_to_this.txt");
m2.set(TikaCoreProperties.TIKA_CONTENT, "fox jumped over the lazy " + i);
diff --git a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/resources/log4j.properties b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/resources/log4j.properties
index 92b6d56..11e5887 100644
--- a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/resources/log4j.properties
+++ b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/resources/log4j.properties
@@ -1,3 +1,4 @@
+#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
@@ -12,13 +13,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
#info,debug, error,fatal ...
log4j.rootLogger=debug,stderr
-
#console
log4j.appender.stderr=org.apache.log4j.ConsoleAppender
log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
log4j.appender.stderr.Target=System.err
-
-log4j.appender.stderr.layout.ConversionPattern= %-5p %m%n
+log4j.appender.stderr.layout.ConversionPattern=%-5p %m%n
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
index 7cb0c02..3063124 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
@@ -16,8 +16,24 @@
*/
package org.apache.tika.pipes.fetchiterator.csv;
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.tika.config.Field;
import org.apache.tika.config.Initializable;
import org.apache.tika.config.InitializableProblemHandler;
@@ -29,21 +45,6 @@ import org.apache.tika.pipes.fetcher.FetchKey;
import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
import org.apache.tika.pipes.fetchiterator.FetchIterator;
import org.apache.tika.utils.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Reader;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeoutException;
-
-import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
/**
* Iterates through a UTF-8 CSV file. This adds all columns
@@ -51,17 +52,20 @@ import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
* to the metadata object.
* <p>
* <ul>
- * <li>If a 'fetchKeyColumn' is specified, this will use that column's value as the fetchKey.</li>
- * <li>If no 'fetchKeyColumn' is specified, this will send the metadata from the other columns.</li>
+ * <li>If a 'fetchKeyColumn' is specified, this will use that
+ * column's value as the fetchKey.</li>
+ * <li>If no 'fetchKeyColumn' is specified, this will send the
+ * metadata from the other columns.</li>
* <li>The 'fetchKeyColumn' value is not added to the metadata.</li>
* </ul>
* <p>
* <ul>
- * <li>If an 'emitKeyColumn' is specified, this will use that column's value as the emit key.</li>
- * <li>If an 'emitKeyColumn' is not specified, this will use the value from the 'fetchKeyColumn'.</li>
+ * <li>If an 'emitKeyColumn' is specified, this will use that
+ * column's value as the emit key.</li>
+ * <li>If an 'emitKeyColumn' is not specified, this will use
+ * the value from the 'fetchKeyColumn'.</li>
* <li>The 'emitKeyColumn' value is not added to the metadata.</li>
* </ul>
- *
*/
public class CSVFetchIterator extends FetchIterator implements Initializable {
@@ -110,24 +114,21 @@ public class CSVFetchIterator extends FetchIterator implements Initializable {
for (CSVRecord record : records) {
String fetchKey = getFetchKey(fetchEmitKeyIndices, record);
String emitKey = getEmitKey(fetchEmitKeyIndices, record);
- if (StringUtils.isBlank(fetchKey) && ! StringUtils.isBlank(fetcherName)) {
+ if (StringUtils.isBlank(fetchKey) && !StringUtils.isBlank(fetcherName)) {
LOGGER.debug("Fetcher specified ({}), but no fetchkey was found in ({})",
fetcherName, record);
}
if (StringUtils.isBlank(emitKey)) {
- throw new IOException("emitKey must not be blank in :"+record);
+ throw new IOException("emitKey must not be blank in :" + record);
}
Metadata metadata = loadMetadata(fetchEmitKeyIndices, headers, record);
- tryToAdd(new FetchEmitTuple(
- new FetchKey(fetcherName, fetchKey),
- new EmitKey(emitterName, emitKey), metadata,
- getOnParseException()));
+ tryToAdd(new FetchEmitTuple(new FetchKey(fetcherName, fetchKey),
+ new EmitKey(emitterName, emitKey), metadata, getOnParseException()));
}
}
}
- private void checkFetchEmitValidity(String fetcherName,
- String emitterName,
+ private void checkFetchEmitValidity(String fetcherName, String emitterName,
FetchEmitKeyIndices fetchEmitKeyIndices,
List<String> headers) throws IOException {
@@ -135,7 +136,7 @@ public class CSVFetchIterator extends FetchIterator implements Initializable {
throw new IOException(new TikaConfigException("must specify at least an emitterName"));
}
- if (StringUtils.isBlank(fetcherName) && ! StringUtils.isBlank(fetchKeyColumn)) {
+ if (StringUtils.isBlank(fetcherName) && !StringUtils.isBlank(fetchKeyColumn)) {
throw new IOException(new TikaConfigException("If specifying a 'fetchKeyColumn', " +
"you must also specify a 'fetcherName'"));
}
@@ -145,17 +146,17 @@ public class CSVFetchIterator extends FetchIterator implements Initializable {
}
//if a fetchkeycolumn is specified, make sure that it was found
- if (! StringUtils.isBlank(fetchKeyColumn) && fetchEmitKeyIndices.fetchKeyIndex < 0) {
- throw new IOException(new TikaConfigException("Couldn't find fetchKeyColumn ("+
- fetchKeyColumn+" in header.\n" +
- "These are the headers I see: " + headers));
+ if (!StringUtils.isBlank(fetchKeyColumn) && fetchEmitKeyIndices.fetchKeyIndex < 0) {
+ throw new IOException(new TikaConfigException(
+ "Couldn't find fetchKeyColumn (" + fetchKeyColumn + " in header.\n" +
+ "These are the headers I see: " + headers));
}
//if an emitkeycolumn is specified, make sure that it was found
- if (! StringUtils.isBlank(emitKeyColumn) && fetchEmitKeyIndices.emitKeyIndex < 0) {
- throw new IOException(new TikaConfigException("Couldn't find emitKeyColumn ("+
- emitKeyColumn+" in header.\n" +
- "These are the headers I see: " + headers));
+ if (!StringUtils.isBlank(emitKeyColumn) && fetchEmitKeyIndices.emitKeyIndex < 0) {
+ throw new IOException(new TikaConfigException(
+ "Couldn't find emitKeyColumn (" + emitKeyColumn + " in header.\n" +
+ "These are the headers I see: " + headers));
}
if (StringUtils.isBlank(emitKeyColumn)) {
@@ -179,7 +180,8 @@ public class CSVFetchIterator extends FetchIterator implements Initializable {
return getFetchKey(fetchEmitKeyIndices, record);
}
- private Metadata loadMetadata(FetchEmitKeyIndices fetchEmitKeyIndices, List<String> headers, CSVRecord record) {
+ private Metadata loadMetadata(FetchEmitKeyIndices fetchEmitKeyIndices, List<String> headers,
+ CSVRecord record) {
Metadata metadata = new Metadata();
for (int i = 0; i < record.size(); i++) {
if (fetchEmitKeyIndices.shouldSkip(i)) {
@@ -192,7 +194,7 @@ public class CSVFetchIterator extends FetchIterator implements Initializable {
private FetchEmitKeyIndices loadHeaders(CSVRecord record, List<String> headers)
- throws IOException {
+ throws IOException {
int fetchKeyColumnIndex = -1;
int emitKeyColumnIndex = -1;
@@ -200,7 +202,7 @@ public class CSVFetchIterator extends FetchIterator implements Initializable {
String header = record.get(col);
if (StringUtils.isBlank(header)) {
throw new IOException(
- new TikaException("Header in column (" +col +") must not be empty"));
+ new TikaException("Header in column (" + col + ") must not be empty"));
}
headers.add(header);
if (header.equals(fetchKeyColumn)) {
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
index 55ae655..fd86a05 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
@@ -14,10 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.apache.tika.pipes.fetchiterator.csv.CSVFetchIterator;
-import org.junit.Test;
+
+import static org.apache.tika.pipes.fetchiterator.FetchIterator.COMPLETED_SEMAPHORE;
+import static org.junit.Assert.assertEquals;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -25,15 +24,16 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import static org.apache.tika.pipes.fetchiterator.FetchIterator.COMPLETED_SEMAPHORE;
-import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.csv.CSVFetchIterator;
public class TestCSVFetchIterator {
@@ -76,10 +76,8 @@ public class TestCSVFetchIterator {
for (MockFetcher f : fetchers) {
for (FetchEmitTuple t : f.pairs) {
String id = t.getMetadata().get("id");
- assertEquals("path/to/my/file"+id,
- t.getFetchKey().getKey());
- assertEquals("project"+
- (Integer.parseInt(id) % 2 == 1 ? "a" : "b"),
+ assertEquals("path/to/my/file" + id, t.getFetchKey().getFetchKey());
+ assertEquals("project" + (Integer.parseInt(id) % 2 == 1 ? "a" : "b"),
t.getMetadata().get("project"));
}
}
@@ -98,12 +96,13 @@ public class TestCSVFetchIterator {
}
private Path get(String testFileName) throws Exception {
- return Paths.get(TestCSVFetchIterator.class.getResource("/"+testFileName).toURI());
+ return Paths.get(TestCSVFetchIterator.class.getResource("/" + testFileName).toURI());
}
private static class MockFetcher implements Callable<Integer> {
private final ArrayBlockingQueue<FetchEmitTuple> queue;
private final List<FetchEmitTuple> pairs = new ArrayList<>();
+
private MockFetcher(ArrayBlockingQueue<FetchEmitTuple> queue) {
this.queue = queue;
}
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/pom.xml b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/pom.xml
index 2813ed4..702554a 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/pom.xml
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/pom.xml
@@ -17,8 +17,8 @@
specific language governing permissions and limitations
under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.tika</groupId>
@@ -29,10 +29,10 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>tika-fetch-iterator-jdbc</artifactId>
-
+
<name>Apache Tika Fetch Iterator - jdbc</name>
<url>http://tika.apache.org/</url>
-
+
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
@@ -97,15 +97,18 @@
</filter>
</filters>
<transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>META-INF/LICENSE</resource>
<file>target/classes/META-INF/LICENSE</file>
</transformer>
- <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>META-INF/NOTICE</resource>
<file>target/classes/META-INF/NOTICE</file>
</transformer>
- <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>META-INF/DEPENDENCIES</resource>
<file>target/classes/META-INF/DEPENDENCIES</file>
</transformer>
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
index 112f52a..f1ca52c 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
@@ -16,19 +16,7 @@
*/
package org.apache.tika.pipes.fetchiterator.jdbc;
-import org.apache.tika.config.Field;
-import org.apache.tika.config.Initializable;
-import org.apache.tika.config.InitializableProblemHandler;
-import org.apache.tika.config.Param;
-import org.apache.tika.exception.TikaConfigException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.apache.tika.utils.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
import java.io.IOException;
import java.sql.Connection;
@@ -42,7 +30,20 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
-import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.config.Field;
+import org.apache.tika.config.Initializable;
+import org.apache.tika.config.InitializableProblemHandler;
+import org.apache.tika.config.Param;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.FetchIterator;
+import org.apache.tika.utils.StringUtils;
/**
* Iterates through a the results from a sql call via jdbc. This adds all columns
@@ -50,8 +51,10 @@ import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
* to the metadata object.
* <p>
* <ul>
- * <li>If a 'fetchKeyColumn' is specified, this will use that column's value as the fetchKey.</li>
- * <li>If no 'fetchKeyColumn' is specified, this will send the metadata from the other columns.</li>
+ * <li>If a 'fetchKeyColumn' is specified, this will use that
+ * column's value as the fetchKey.</li>
+ * <li>If no 'fetchKeyColumn' is specified, this will send the
+ * metadata from the other columns.</li>
* <li>The 'fetchKeyColumn' value is not added to the metadata.</li>
* </ul>
* <p>
@@ -59,7 +62,6 @@ import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
* <li>An 'emitKeyColumn' must be specified</li>
* <li>The 'emitKeyColumn' value is not added to the metadata.</li>
* </ul>
- *
*/
public class JDBCFetchIterator extends FetchIterator implements Initializable {
@@ -88,15 +90,15 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
this.connection = connection;
}
+ public String getSelect() {
+ return select;
+ }
+
@Field
public void setSelect(String select) {
this.select = select;
}
- public String getSelect() {
- return select;
- }
-
@Override
protected void enqueue() throws InterruptedException, IOException, TimeoutException {
String fetcherName = getFetcherName();
@@ -110,16 +112,17 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
while (rs.next()) {
if (headers.size() == 0) {
fetchEmitKeyIndices = loadHeaders(rs.getMetaData(), headers);
- checkFetchEmitValidity(fetcherName, emitterName, fetchEmitKeyIndices, headers);
+ checkFetchEmitValidity(fetcherName, emitterName, fetchEmitKeyIndices,
+ headers);
}
try {
processRow(fetcherName, emitterName, headers, fetchEmitKeyIndices, rs);
} catch (SQLException e) {
- LOGGER.warn("Failed to insert: "+rs, e);
+ LOGGER.warn("Failed to insert: " + rs, e);
}
rowCount++;
if (rowCount % 1000 == 0) {
- LOGGER.info("added "+rowCount + " rows to the queue");
+ LOGGER.info("added " + rowCount + " rows to the queue");
}
}
}
@@ -129,21 +132,23 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
} finally {
try {
db.close();
- } catch (SQLException e){
+ } catch (SQLException e) {
LOGGER.warn("failed to close connection", e);
}
}
}
- private void checkFetchEmitValidity(String fetcherName,
- String emitterName,
+
+ private void checkFetchEmitValidity(String fetcherName, String emitterName,
FetchEmitKeyIndices fetchEmitKeyIndices,
List<String> headers) throws IOException {
- if (! StringUtils.isBlank(fetchKeyColumn) && fetchEmitKeyIndices.fetchKeyIndex < 0) {
- throw new IOException(new TikaConfigException("Couldn't find column: "+fetchKeyColumn));
+ if (!StringUtils.isBlank(fetchKeyColumn) && fetchEmitKeyIndices.fetchKeyIndex < 0) {
+ throw new IOException(
+ new TikaConfigException("Couldn't find column: " + fetchKeyColumn));
}
- if (! StringUtils.isBlank(emitKeyColumn) && fetchEmitKeyIndices.emitKeyIndex < 0) {
- throw new IOException(new TikaConfigException("Couldn't find column: "+emitKeyColumn));
+ if (!StringUtils.isBlank(emitKeyColumn) && fetchEmitKeyIndices.emitKeyIndex < 0) {
+ throw new IOException(
+ new TikaConfigException("Couldn't find column: " + emitKeyColumn));
}
}
@@ -165,7 +170,7 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
if (i == fetchEmitKeyIndices.emitKeyIndex) {
emitKey = getString(i, rs);
if (emitKey == null) {
- LOGGER.debug("emitKey is empty for record "+toString(rs));
+ LOGGER.debug("emitKey is empty for record " + toString(rs));
}
emitKey = (emitKey == null) ? "" : emitKey;
continue;
@@ -176,10 +181,8 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
}
}
- tryToAdd(new FetchEmitTuple(
- new FetchKey(fetcherName, fetchKey),
- new EmitKey(emitterName, emitKey),
- metadata, getOnParseException()));
+ tryToAdd(new FetchEmitTuple(new FetchKey(fetcherName, fetchKey),
+ new EmitKey(emitterName, emitKey), metadata, getOnParseException()));
}
private String toString(ResultSet rs) throws SQLException {
@@ -188,7 +191,7 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
String val = rs.getString(i);
val = (val == null) ? "" : val;
val = (val.length() > 100) ? val.substring(0, 100) : val;
- sb.append(rs.getMetaData().getColumnLabel(i)+":"+val+"\n");
+ sb.append(rs.getMetaData().getColumnLabel(i) + ":" + val + "\n");
}
return sb.toString();
}
@@ -203,7 +206,8 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
}
- private FetchEmitKeyIndices loadHeaders(ResultSetMetaData metaData, List<String> headers) throws SQLException {
+ private FetchEmitKeyIndices loadHeaders(ResultSetMetaData metaData, List<String> headers)
+ throws SQLException {
int fetchKeyIndex = -1;
int emitKeyIndex = -1;
for (int i = 1; i <= metaData.getColumnCount(); i++) {
@@ -236,7 +240,7 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
mustNotBeEmpty("emitterName", this.getEmitterName());
mustNotBeEmpty("emitKeyColumn", this.emitKeyColumn);
- if (StringUtils.isBlank(getFetcherName()) && ! StringUtils.isBlank(fetchKeyColumn)) {
+ if (StringUtils.isBlank(getFetcherName()) && !StringUtils.isBlank(fetchKeyColumn)) {
throw new TikaConfigException(
"If you specify a 'fetchKeyColumn', you must specify a 'fetcherName'");
}
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
index 9a92fc5..7eebe31 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
@@ -16,13 +16,9 @@
*/
package org.apache.tika.pipes.fetchiterator.jdbc;
-import org.apache.commons.io.FileUtils;
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -44,35 +40,41 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
+import org.apache.commons.io.FileUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.FetchIterator;
public class TestJDBCFetchIterator {
static final String TABLE = "fetchkeys";
- static Connection CONNECTION;
- static Path DB_DIR;
static final String db = "mydb";
private static final int NUM_ROWS = 1000;
+ static Connection CONNECTION;
+ static Path DB_DIR;
@BeforeClass
public static void setUp() throws Exception {
DB_DIR = Files.createTempDirectory("tika-jdbc-fetchiterator-test-");
- CONNECTION = DriverManager.getConnection("jdbc:h2:file:"+DB_DIR.toAbsolutePath()+"/"+db);
- String sql = "create table "+TABLE +
- " (id varchar(128), " +
+ CONNECTION =
+ DriverManager.getConnection("jdbc:h2:file:" +
+ DB_DIR.toAbsolutePath() + "/" + db);
+ String sql = "create table " + TABLE + " (id varchar(128), " +
"project varchar(128), " +
"fetchKey varchar(128))";
CONNECTION.createStatement().execute(sql);
for (int i = 0; i < NUM_ROWS; i++) {
- sql = "insert into "+TABLE + " (id, project, fetchKey) values ('id"+i+"','project"+
- (i%2 == 0 ? "a" : "b") +"','fk"+i+"')";
+ sql = "insert into " + TABLE + " (id, project, fetchKey) values ('id" + i +
+ "','project" + (i % 2 == 0 ? "a" : "b") + "','fk" + i + "')";
CONNECTION.createStatement().execute(sql);
}
- sql = "select count(1) from "+ TABLE;
+ sql = "select count(1) from " + TABLE;
ResultSet rs = CONNECTION.createStatement().executeQuery(sql);
while (rs.next()) {
int cnt = rs.getInt(1);
@@ -90,7 +92,7 @@ public class TestJDBCFetchIterator {
TikaConfig tk = getConfig();
int numConsumers = 5;
FetchIterator fetchIterator = tk.getFetchIterator();
- ExecutorService es = Executors.newFixedThreadPool(numConsumers+1);
+ ExecutorService es = Executors.newFixedThreadPool(numConsumers + 1);
ExecutorCompletionService<Integer> completionService =
new ExecutorCompletionService<>(es);
ArrayBlockingQueue<FetchEmitTuple> queue = new ArrayBlockingQueue<>(100);
@@ -118,16 +120,16 @@ public class TestJDBCFetchIterator {
Matcher m = Pattern.compile("fk(\\d+)").matcher("");
for (MockFetcher f : fetchers) {
for (FetchEmitTuple p : f.pairs) {
- String k = p.getFetchKey().getKey();
+ String k = p.getFetchKey().getFetchKey();
String num = "";
if (m.reset(k).find()) {
num = m.group(1);
} else {
- fail("failed to find key pattern: "+k);
+ fail("failed to find key pattern: " + k);
}
String aOrB = Integer.parseInt(num) % 2 == 0 ? "a" : "b";
- assertEquals("id"+num, p.getMetadata().get("MY_ID"));
- assertEquals("project"+aOrB, p.getMetadata().get("MY_PROJECT"));
+ assertEquals("id" + num, p.getMetadata().get("MY_ID"));
+ assertEquals("project" + aOrB, p.getMetadata().get("MY_PROJECT"));
assertNull(p.getMetadata().get("fetchKey"));
assertNull(p.getMetadata().get("MY_FETCHKEY"));
cnt++;
@@ -139,17 +141,22 @@ public class TestJDBCFetchIterator {
private TikaConfig getConfig() throws Exception {
String config = "<?xml version=\"1.0\" encoding=\"UTF-8\" ?><properties>\n" +
" <fetchIterators>\n" +
- " <fetchIterator class=\"org.apache.tika.pipes.fetchiterator.jdbc.JDBCFetchIterator\">\n" +
+ " <fetchIterator " +
+ " class=\"org.apache.tika.pipes.fetchiterator.jdbc.JDBCFetchIterator\">\n" +
" <params>\n" +
" <param name=\"fetcherName\" type=\"string\">s3f</param>\n" +
" <param name=\"emitterName\" type=\"string\">s3e</param>\n" +
" <param name=\"queueSize\" type=\"int\">57</param>\n" +
- " <param name=\"fetchKeyColumn\" type=\"string\">my_fetchkey</param>\n" +
- " <param name=\"emitKeyColumn\" type=\"string\">my_fetchkey</param>\n" +
+ " <param name=\"fetchKeyColumn\" " +
+ " type=\"string\">my_fetchkey</param>\n" +
+ " <param name=\"emitKeyColumn\" " +
+ " type=\"string\">my_fetchkey</param>\n" +
" <param name=\"select\" type=\"string\">" +
- "select id as my_id, project as my_project, fetchKey as my_fetchKey from fetchkeys</param>\n" +
- " <param name=\"connection\" type=\"string\">jdbc:h2:file:"+
- DB_DIR.toAbsolutePath()+"/"+db +"</param>\n" +
+ "select id as my_id, project as my_project, fetchKey as my_fetchKey " +
+ "from fetchkeys</param>\n" +
+ " <param name=\"connection\" " +
+ " type=\"string\">jdbc:h2:file:" + DB_DIR.toAbsolutePath() + "/" +
+ db + "</param>\n" +
" </params>\n" +
" </fetchIterator>\n" +
" </fetchIterators>\n" +
@@ -160,6 +167,7 @@ public class TestJDBCFetchIterator {
private static class MockFetcher implements Callable<Integer> {
private final ArrayBlockingQueue<FetchEmitTuple> queue;
private final List<FetchEmitTuple> pairs = new ArrayList<>();
+
private MockFetcher(ArrayBlockingQueue<FetchEmitTuple> queue) {
this.queue = queue;
}
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/resources/log4j.properties b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/resources/log4j.properties
index 1bee240..2b2da1a 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/resources/log4j.properties
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/resources/log4j.properties
@@ -1,3 +1,4 @@
+#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
@@ -12,13 +13,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
#info,debug, error,fatal ...
log4j.rootLogger=info,stderr
-
#console
log4j.appender.stderr=org.apache.log4j.ConsoleAppender
log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
log4j.appender.stderr.Target=System.err
-
log4j.appender.stderr.layout.ConversionPattern=%-5p [%t]: %m%n
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/pom.xml b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/pom.xml
index ef8854d..24be277 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/pom.xml
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/pom.xml
@@ -17,8 +17,8 @@
specific language governing permissions and limitations
under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.tika</groupId>
@@ -133,15 +133,18 @@
</filter>
</filters>
<transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>META-INF/LICENSE</resource>
<file>target/classes/META-INF/LICENSE</file>
</transformer>
- <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>META-INF/NOTICE</resource>
<file>target/classes/META-INF/NOTICE</file>
</transformer>
- <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>META-INF/DEPENDENCIES</resource>
<file>target/classes/META-INF/DEPENDENCIES</file>
</transformer>
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
index b8897a8..988f149 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
@@ -16,6 +16,12 @@
*/
package org.apache.tika.pipes.fetchiterator.s3;
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
@@ -24,6 +30,9 @@ import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.iterable.S3Objects;
import com.amazonaws.services.s3.model.S3ObjectSummary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.tika.config.Field;
import org.apache.tika.config.Initializable;
import org.apache.tika.config.InitializableProblemHandler;
@@ -34,14 +43,6 @@ import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.fetcher.FetchKey;
import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.TimeoutException;
-
-import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
public class S3FetchIterator extends FetchIterator implements Initializable {
@@ -76,8 +77,9 @@ public class S3FetchIterator extends FetchIterator implements Initializable {
@Field
public void setCredentialsProvider(String credentialsProvider) {
- if (! credentialsProvider.equals("profile") && ! credentialsProvider.equals("instance")) {
- throw new IllegalArgumentException("credentialsProvider must be either 'profile' or instance'");
+ if (!credentialsProvider.equals("profile") && !credentialsProvider.equals("instance")) {
+ throw new IllegalArgumentException(
+ "credentialsProvider must be either 'profile' or instance'");
}
this.credentialsProvider = credentialsProvider;
}
@@ -85,6 +87,7 @@ public class S3FetchIterator extends FetchIterator implements Initializable {
/**
* This initializes the s3 client. Note, we wrap S3's RuntimeExceptions,
* e.g. AmazonClientException in a TikaConfigException.
+ *
* @param params params to use for initialization
* @throws TikaConfigException
*/
@@ -94,7 +97,7 @@ public class S3FetchIterator extends FetchIterator implements Initializable {
AWSCredentialsProvider provider = null;
if ("instance".equals(credentialsProvider)) {
provider = InstanceProfileCredentialsProvider.getInstance();
- } else if ("profile".equals(credentialsProvider)){
+ } else if ("profile".equals(credentialsProvider)) {
provider = new ProfileCredentialsProvider(profile);
} else {
throw new TikaConfigException("credentialsProvider must be set and " +
@@ -102,9 +105,7 @@ public class S3FetchIterator extends FetchIterator implements Initializable {
}
try {
- s3Client = AmazonS3ClientBuilder.standard()
- .withRegion(region)
- .withCredentials(provider)
+ s3Client = AmazonS3ClientBuilder.standard().withRegion(region).withCredentials(provider)
.build();
} catch (AmazonClientException e) {
throw new TikaConfigException("can't initialize s3 fetchiterator");
@@ -128,12 +129,10 @@ public class S3FetchIterator extends FetchIterator implements Initializable {
for (S3ObjectSummary summary : S3Objects.withPrefix(s3Client, bucket, prefix)) {
long elapsed = System.currentTimeMillis() - start;
- LOGGER.debug("adding ({}) {} in {} ms", count, summary.getKey(),
- elapsed);
- tryToAdd(new FetchEmitTuple(
- new FetchKey(fetcherName, summary.getKey()),
- new EmitKey(emitterName, summary.getKey()),
- new Metadata(), getOnParseException()));
+ LOGGER.debug("adding ({}) {} in {} ms", count, summary.getKey(), elapsed);
+ tryToAdd(new FetchEmitTuple(new FetchKey(fetcherName, summary.getKey()),
+ new EmitKey(emitterName, summary.getKey()), new Metadata(),
+ getOnParseException()));
count++;
}
long elapsed = System.currentTimeMillis() - start;
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
index 567920d..50cb819 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
@@ -15,10 +15,8 @@
* limitations under the License.
*/
package org.apache.tika.pipes.fetchiterator.s3;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.junit.Ignore;
-import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Collections;
@@ -31,7 +29,11 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertEquals;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.FetchIterator;
@Ignore("turn into an actual unit test")
public class TestS3FetchIterator {
@@ -48,7 +50,7 @@ public class TestS3FetchIterator {
int numConsumers = 6;
ArrayBlockingQueue<FetchEmitTuple> queue = new ArrayBlockingQueue<>(10);
- ExecutorService es = Executors.newFixedThreadPool(numConsumers+1);
+ ExecutorService es = Executors.newFixedThreadPool(numConsumers + 1);
ExecutorCompletionService c = new ExecutorCompletionService(es);
List<MockFetcher> fetchers = new ArrayList<>();
for (int i = 0; i < numConsumers; i++) {
@@ -65,7 +67,7 @@ public class TestS3FetchIterator {
int finished = 0;
int completed = 0;
try {
- while (finished++ < numConsumers+1) {
+ while (finished++ < numConsumers + 1) {
Future<Integer> f = c.take();
completed += f.get();
}
@@ -79,6 +81,7 @@ public class TestS3FetchIterator {
private static class MockFetcher implements Callable<Integer> {
private final ArrayBlockingQueue<FetchEmitTuple> queue;
private final List<FetchEmitTuple> pairs = new ArrayList<>();
+
private MockFetcher(ArrayBlockingQueue<FetchEmitTuple> queue) {
this.queue = queue;
}
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/resources/log4j.properties b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/resources/log4j.properties
index 1bee240..2b2da1a 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/resources/log4j.properties
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/resources/log4j.properties
@@ -1,3 +1,4 @@
+#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
@@ -12,13 +13,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
#info,debug, error,fatal ...
log4j.rootLogger=info,stderr
-
#console
log4j.appender.stderr=org.apache.log4j.ConsoleAppender
log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
log4j.appender.stderr.Target=System.err
-
log4j.appender.stderr.layout.ConversionPattern=%-5p [%t]: %m%n
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-http/pom.xml b/tika-pipes/tika-fetchers/tika-fetcher-http/pom.xml
index 09dcb97..ae76a52 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-http/pom.xml
+++ b/tika-pipes/tika-fetchers/tika-fetcher-http/pom.xml
@@ -17,8 +17,8 @@
specific language governing permissions and limitations
under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>tika-fetchers</artifactId>
@@ -97,15 +97,18 @@
</filter>
</filters>
<transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>META-INF/LICENSE</resource>
<file>target/classes/META-INF/LICENSE</file>
</transformer>
- <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>META-INF/NOTICE</resource>
<file>target/classes/META-INF/NOTICE</file>
</transformer>
- <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>META-INF/DEPENDENCIES</resource>
<file>target/classes/META-INF/DEPENDENCIES</file>
</transformer>
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java b/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java
index a6d2325..be39765 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java
+++ b/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java
@@ -17,11 +17,19 @@
package org.apache.tika.pipes.fetcher.http;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.tika.client.HttpClientFactory;
import org.apache.tika.config.Field;
import org.apache.tika.config.Initializable;
@@ -32,13 +40,6 @@ import org.apache.tika.exception.TikaException;
import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.pipes.fetcher.AbstractFetcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
/**
* Based on Apache httpclient
@@ -52,9 +53,9 @@ public class HttpFetcher extends AbstractFetcher implements Initializable {
public HttpFetcher() throws TikaConfigException {
httpClientFactory = new HttpClientFactory();
}
+
@Override
- public InputStream fetch(String fetchKey, Metadata metadata)
- throws IOException, TikaException {
+ public InputStream fetch(String fetchKey, Metadata metadata) throws IOException, TikaException {
HttpGet get = new HttpGet(fetchKey);
return get(get);
}
@@ -62,7 +63,7 @@ public class HttpFetcher extends AbstractFetcher implements Initializable {
public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata metadata)
throws IOException, TikaException {
HttpGet get = new HttpGet(fetchKey);
- get.setHeader("Range", "bytes="+startRange+"-"+endRange);
+ get.setHeader("Range", "bytes=" + startRange + "-" + endRange);
return get(get);
}
@@ -70,21 +71,18 @@ public class HttpFetcher extends AbstractFetcher implements Initializable {
HttpResponse response = httpClient.execute(get);
int code = response.getStatusLine().getStatusCode();
if (code < 200 || code > 299) {
- throw new IOException("bad status code: "+
- code
- + " :: " +
+ throw new IOException("bad status code: " + code + " :: " +
responseToString(response.getEntity().getContent()));
}
//spool to local
long start = System.currentTimeMillis();
- TikaInputStream tis = TikaInputStream.get(
- response.getEntity().getContent());
+ TikaInputStream tis = TikaInputStream.get(response.getEntity().getContent());
tis.getPath();
if (response instanceof CloseableHttpResponse) {
((CloseableHttpResponse) response).close();
}
- long elapsed = System.currentTimeMillis()-start;
+ long elapsed = System.currentTimeMillis() - start;
LOG.debug("took {} ms to copy to local tmp file", elapsed);
return tis;
}
@@ -149,7 +147,8 @@ public class HttpFetcher extends AbstractFetcher implements Initializable {
}
@Override
- public void checkInitialization(InitializableProblemHandler problemHandler) throws TikaConfigException {
+ public void checkInitialization(InitializableProblemHandler problemHandler)
+ throws TikaConfigException {
}
}
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-http/src/test/java/org/apache/tika/pipes/fetcher/http/HttpFetcherTest.java b/tika-pipes/tika-fetchers/tika-fetcher-http/src/test/java/org/apache/tika/pipes/fetcher/http/HttpFetcherTest.java
index 8a2d8a1..3a2e127 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-http/src/test/java/org/apache/tika/pipes/fetcher/http/HttpFetcherTest.java
+++ b/tika-pipes/tika-fetchers/tika-fetcher-http/src/test/java/org/apache/tika/pipes/fetcher/http/HttpFetcherTest.java
@@ -16,13 +16,7 @@
*/
package org.apache.tika.pipes.fetcher.http;
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.exception.TikaException;
-import org.apache.tika.io.TemporaryResources;
-import org.apache.tika.metadata.Metadata;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.xml.sax.SAXException;
+import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.io.InputStream;
@@ -31,32 +25,40 @@ import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.zip.GZIPInputStream;
-import static org.junit.Assert.assertEquals;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.xml.sax.SAXException;
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.io.TemporaryResources;
+import org.apache.tika.metadata.Metadata;
@Ignore("requires network connectivity")
public class HttpFetcherTest {
- @Test
- public void testRange() throws Exception {
- String url =
- "https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2020-45/segments/1603107869785.9/warc/CC-MAIN-20201020021700-20201020051700-00529.warc.gz";
- long start = 969596307;
- long end = start + 1408 - 1;
- Metadata metadata = new Metadata();
- HttpFetcher httpFetcher = (HttpFetcher) getConfig("tika-config-http.xml")
- .getFetcherManager().getFetcher("http");
- try (TemporaryResources tmp = new TemporaryResources()) {
- Path tmpPath = tmp.createTempFile();
- try (InputStream is = httpFetcher.fetch(url, start, end, metadata)) {
- Files.copy(new GZIPInputStream(is), tmpPath, StandardCopyOption.REPLACE_EXISTING);
- }
- assertEquals(2461, Files.size(tmpPath));
+ @Test
+ public void testRange() throws Exception {
+ String url =
+ "https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2020-45/segments/1603107869785.9/warc/CC-MAIN-20201020021700-20201020051700-00529.warc.gz";
+ long start = 969596307;
+ long end = start + 1408 - 1;
+ Metadata metadata = new Metadata();
+ HttpFetcher httpFetcher =
+ (HttpFetcher) getConfig("tika-config-http.xml").getFetcherManager()
+ .getFetcher("http");
+ try (TemporaryResources tmp = new TemporaryResources()) {
+ Path tmpPath = tmp.createTempFile();
+ try (InputStream is = httpFetcher.fetch(url, start, end, metadata)) {
+ Files.copy(new GZIPInputStream(is), tmpPath, StandardCopyOption.REPLACE_EXISTING);
}
+ assertEquals(2461, Files.size(tmpPath));
}
+ }
TikaConfig getConfig(String path) throws TikaException, IOException, SAXException {
- return new TikaConfig(HttpFetcherTest.class.getResourceAsStream("/"+path));
+ return new TikaConfig(HttpFetcherTest.class.getResourceAsStream("/" + path));
}
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
index 506a040..a6b34dd 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
+++ b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
@@ -16,6 +16,13 @@
*/
package org.apache.tika.pipes.fetcher.s3;
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.regex.Pattern;
+
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
@@ -24,24 +31,18 @@ import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.tika.config.Field;
import org.apache.tika.config.Initializable;
import org.apache.tika.config.InitializableProblemHandler;
import org.apache.tika.config.Param;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.exception.TikaException;
-import org.apache.tika.pipes.fetcher.AbstractFetcher;
import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.Metadata;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Map;
-import java.util.regex.Pattern;
-
-import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+import org.apache.tika.pipes.fetcher.AbstractFetcher;
/**
* Fetches files from s3. Example string: s3://my_bucket/path/to/my_file.pdf
@@ -61,27 +62,23 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
private boolean spoolToTemp = true;
@Override
- public InputStream fetch(String fetchKey, Metadata metadata)
- throws TikaException, IOException {
+ public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {
- LOGGER.debug("about to fetch fetchkey={} from bucket ({})",
- fetchKey, bucket);
+ LOGGER.debug("about to fetch fetchkey={} from bucket ({})", fetchKey, bucket);
try {
S3Object s3Object = s3Client.getObject(new GetObjectRequest(bucket, fetchKey));
if (extractUserMetadata) {
- for (Map.Entry<String, String> e :
- s3Object.getObjectMetadata().getUserMetadata().entrySet()) {
+ for (Map.Entry<String, String> e : s3Object.getObjectMetadata().getUserMetadata()
+ .entrySet()) {
metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
}
}
if (!spoolToTemp) {
- return TikaInputStream.get(
- s3Object.getObjectContent());
+ return TikaInputStream.get(s3Object.getObjectContent());
} else {
long start = System.currentTimeMillis();
- TikaInputStream tis = TikaInputStream.get(
- s3Object.getObjectContent());
+ TikaInputStream tis = TikaInputStream.get(s3Object.getObjectContent());
tis.getPath();
long elapsed = System.currentTimeMillis() - start;
LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
@@ -95,26 +92,24 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata metadata)
throws TikaException, IOException {
//TODO -- figure out how to integrate this
- LOGGER.debug("about to fetch fetchkey={} (start={} end={}) from bucket ({})",
- fetchKey, startRange, endRange, bucket);
+ LOGGER.debug("about to fetch fetchkey={} (start={} end={}) from bucket ({})", fetchKey,
+ startRange, endRange, bucket);
try {
- S3Object s3Object = s3Client.getObject(new GetObjectRequest(bucket, fetchKey)
- .withRange(startRange, endRange));
+ S3Object s3Object = s3Client.getObject(
+ new GetObjectRequest(bucket, fetchKey).withRange(startRange, endRange));
if (extractUserMetadata) {
- for (Map.Entry<String, String> e :
- s3Object.getObjectMetadata().getUserMetadata().entrySet()) {
+ for (Map.Entry<String, String> e : s3Object.getObjectMetadata().getUserMetadata()
+ .entrySet()) {
metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
}
}
if (!spoolToTemp) {
- return TikaInputStream.get(
- s3Object.getObjectContent());
+ return TikaInputStream.get(s3Object.getObjectContent());
} else {
long start = System.currentTimeMillis();
- TikaInputStream tis = TikaInputStream.get(
- s3Object.getObjectContent());
+ TikaInputStream tis = TikaInputStream.get(s3Object.getObjectContent());
tis.getPath();
long elapsed = System.currentTimeMillis() - start;
LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
@@ -157,8 +152,9 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
@Field
public void setCredentialsProvider(String credentialsProvider) {
- if (! credentialsProvider.equals("profile") && ! credentialsProvider.equals("instance")) {
- throw new IllegalArgumentException("credentialsProvider must be either 'profile' or instance'");
+ if (!credentialsProvider.equals("profile") && !credentialsProvider.equals("instance")) {
+ throw new IllegalArgumentException(
+ "credentialsProvider must be either 'profile' or instance'");
}
this.credentialsProvider = credentialsProvider;
}
@@ -166,6 +162,7 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
/**
* This initializes the s3 client. Note, we wrap S3's RuntimeExceptions,
* e.g. AmazonClientException in a TikaConfigException.
+ *
* @param params params to use for initialization
* @throws TikaConfigException
*/
@@ -175,7 +172,7 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
AWSCredentialsProvider provider = null;
if ("instance".equals(credentialsProvider)) {
provider = InstanceProfileCredentialsProvider.getInstance();
- } else if ("profile".equals(credentialsProvider)){
+ } else if ("profile".equals(credentialsProvider)) {
provider = new ProfileCredentialsProvider(profile);
} else {
throw new TikaConfigException("credentialsProvider must be set and " +
@@ -183,9 +180,7 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
}
try {
- s3Client = AmazonS3ClientBuilder.standard()
- .withRegion(region)
- .withCredentials(provider)
+ s3Client = AmazonS3ClientBuilder.standard().withRegion(region).withCredentials(provider)
.build();
} catch (AmazonClientException e) {
throw new TikaConfigException("can't initialize s3 fetcher", e);
@@ -193,7 +188,8 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
}
@Override
- public void checkInitialization(InitializableProblemHandler problemHandler) throws TikaConfigException {
+ public void checkInitialization(InitializableProblemHandler problemHandler)
+ throws TikaConfigException {
mustNotBeEmpty("bucket", this.bucket);
mustNotBeEmpty("region", this.region);
}
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/java/org/apache/tika/pipes/fetcher/s3/TestS3Fetcher.java b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/java/org/apache/tika/pipes/fetcher/s3/TestS3Fetcher.java
index 450d933..9be1ab4 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/java/org/apache/tika/pipes/fetcher/s3/TestS3Fetcher.java
+++ b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/java/org/apache/tika/pipes/fetcher/s3/TestS3Fetcher.java
@@ -15,11 +15,6 @@
* limitations under the License.
*/
package org.apache.tika.pipes.fetcher.s3;
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.pipes.fetcher.Fetcher;
-import org.apache.tika.metadata.Metadata;
-import org.junit.Ignore;
-import org.junit.Test;
import java.io.InputStream;
import java.nio.file.Files;
@@ -28,12 +23,19 @@ import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.Collections;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.fetcher.Fetcher;
+
@Ignore("write actual unit tests")
public class TestS3Fetcher {
private static final String FETCH_STRING = "";
- private Path outputFile = Paths.get("");
- private String region = "us-east-1";
- private String profile = "";
+ private final Path outputFile = Paths.get("");
+ private final String region = "us-east-1";
+ private final String profile = "";
@Test
public void testBasic() throws Exception {
@@ -50,9 +52,8 @@ public class TestS3Fetcher {
@Test
public void testConfig() throws Exception {
- TikaConfig config = new TikaConfig(
- this.getClass().getResourceAsStream("/tika-config-s3.xml")
- );
+ TikaConfig config =
+ new TikaConfig(this.getClass().getResourceAsStream("/tika-config-s3.xml"));
Fetcher fetcher = config.getFetcherManager().getFetcher("s3");
Metadata metadata = new Metadata();
try (InputStream is = fetcher.fetch(FETCH_STRING, metadata)) {
diff --git a/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientFactory.java b/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientFactory.java
index 8cbc26b..77619f9 100644
--- a/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientFactory.java
+++ b/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientFactory.java
@@ -16,6 +16,27 @@
*/
package org.apache.tika.client;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.security.InvalidKeyException;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.Set;
+import javax.crypto.BadPaddingException;
+import javax.crypto.Cipher;
+import javax.crypto.IllegalBlockSizeException;
+import javax.crypto.NoSuchPaddingException;
+import javax.crypto.spec.SecretKeySpec;
+import javax.net.ssl.SSLContext;
+
import org.apache.http.Header;
import org.apache.http.HeaderElement;
import org.apache.http.HeaderElementIterator;
@@ -52,35 +73,15 @@ import org.apache.http.message.BasicHeaderElementIterator;
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
import org.apache.http.ssl.SSLContexts;
-import org.apache.tika.exception.TikaConfigException;
-import org.apache.tika.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.crypto.BadPaddingException;
-import javax.crypto.Cipher;
-import javax.crypto.IllegalBlockSizeException;
-import javax.crypto.NoSuchPaddingException;
-import javax.crypto.spec.SecretKeySpec;
-import javax.net.ssl.SSLContext;
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
-import java.security.InvalidKeyException;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Arrays;
-import java.util.Base64;
-import java.util.HashSet;
-import java.util.Set;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.utils.StringUtils;
/**
* This holds quite a bit of state and is not thread safe. Beware!
- *
+ * <p>
* Also, we're currently ignoring the SSL checks. Please open a ticket/PR
* if you need robust SSL.
*/
@@ -104,7 +105,7 @@ public class HttpClientFactory {
private String password;
private String ntDomain;//if using nt credentials
private String authScheme = "basic"; //ntlm or basic
- private boolean credentialsAESEncrypted = false;
+ private final boolean credentialsAESEncrypted = false;
public HttpClientFactory() throws TikaConfigException {
@@ -116,6 +117,7 @@ public class HttpClientFactory {
aes = new AES();
}
}
+
public String getProxyHost() {
return proxyHost;
}
@@ -218,6 +220,7 @@ public class HttpClientFactory {
/**
* only basic and ntlm are supported
+ *
* @param authScheme
*/
public void setAuthScheme(String authScheme) {
@@ -230,19 +233,18 @@ public class HttpClientFactory {
TrustStrategy acceptingTrustStrategy = (cert, authType) -> true;
SSLContext sslContext = null;
try {
- sslContext = SSLContexts.custom().loadTrustMaterial(null,
- acceptingTrustStrategy).build();
+ sslContext =
+ SSLContexts.custom().loadTrustMaterial(
+ null, acceptingTrustStrategy).build();
} catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException e) {
throw new TikaConfigException("", e);
}
- SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext,
- NoopHostnameVerifier.INSTANCE);
+ SSLConnectionSocketFactory sslsf =
+ new SSLConnectionSocketFactory(sslContext, NoopHostnameVerifier.INSTANCE);
Registry<ConnectionSocketFactory> socketFactoryRegistry =
- RegistryBuilder.<ConnectionSocketFactory>create()
- .register("https", sslsf)
- .register("http", new PlainConnectionSocketFactory())
- .build();
+ RegistryBuilder.<ConnectionSocketFactory>create().register("https", sslsf)
+ .register("http", new PlainConnectionSocketFactory()).build();
PoolingHttpClientConnectionManager manager =
new PoolingHttpClientConnectionManager(socketFactoryRegistry);
@@ -253,19 +255,13 @@ public class HttpClientFactory {
addCredentialsProvider(builder);
addProxy(builder);
return builder.setConnectionManager(manager)
- .setRedirectStrategy(
- new CustomRedirectStrategy(allowedHostsForRedirect))
- .setDefaultRequestConfig(RequestConfig.custom()
- .setTargetPreferredAuthSchemes(Arrays.asList(AuthSchemes.BASIC,
- AuthSchemes.NTLM))
- .setConnectionRequestTimeout((int) requestTimeout)
- .setConnectionRequestTimeout(connectTimeout)
- .setSocketTimeout(socketTimeout)
- .build()
- )
- .setKeepAliveStrategy(getKeepAliveStrategy())
- .setSSLSocketFactory(sslsf)
- .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
+ .setRedirectStrategy(new CustomRedirectStrategy(allowedHostsForRedirect))
+ .setDefaultRequestConfig(RequestConfig.custom().setTargetPreferredAuthSchemes(
+ Arrays.asList(AuthSchemes.BASIC, AuthSchemes.NTLM))
+ .setConnectionRequestTimeout(requestTimeout)
+ .setConnectionRequestTimeout(connectTimeout).setSocketTimeout(socketTimeout)
+ .build()).setKeepAliveStrategy(getKeepAliveStrategy())
+ .setSSLSocketFactory(sslsf).setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
.build();
}
@@ -285,33 +281,32 @@ public class HttpClientFactory {
if ((StringUtils.isBlank(userName) && StringUtils.isBlank(password)) ||
(StringUtils.isBlank(password) && StringUtils.isBlank(userName))) {
- throw new IllegalArgumentException("can't have one of 'username', " +
- "'password' null and the other not");
+ throw new IllegalArgumentException(
+ "can't have one of 'username', " + "'password' null and the other not");
}
String finalUserName = decrypt(userName);
String finalPassword = decrypt(password);
String finalDomain = decrypt(ntDomain);
- CredentialsProvider provider = new BasicCredentialsProvider();
- Credentials credentials = null;
- Registry<AuthSchemeProvider> authSchemeRegistry = null;
- if (authScheme.equals("basic")) {
- credentials = new UsernamePasswordCredentials(finalUserName, finalPassword);
- authSchemeRegistry = RegistryBuilder
- .<AuthSchemeProvider>create()
- .register("basic", new BasicSchemeFactory())
- .build();
- } else if (authScheme.equals("ntlm")) {
- if (StringUtils.isBlank(ntDomain)) {
- throw new IllegalArgumentException("must specify 'ntDomain'");
- }
- credentials = new NTCredentials(finalUserName, finalPassword, null, finalDomain);
- authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider>create()
- .register("ntlm", new NTLMSchemeFactory()).build();
+ CredentialsProvider provider = new BasicCredentialsProvider();
+ Credentials credentials = null;
+ Registry<AuthSchemeProvider> authSchemeRegistry = null;
+ if (authScheme.equals("basic")) {
+ credentials = new UsernamePasswordCredentials(finalUserName, finalPassword);
+ authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider>create()
+ .register("basic", new BasicSchemeFactory()).build();
+ } else if (authScheme.equals("ntlm")) {
+ if (StringUtils.isBlank(ntDomain)) {
+ throw new IllegalArgumentException("must specify 'ntDomain'");
}
- provider.setCredentials(AuthScope.ANY, credentials);
- builder.setDefaultCredentialsProvider(provider);
- builder.setDefaultAuthSchemeRegistry(authSchemeRegistry);
+ credentials = new NTCredentials(finalUserName, finalPassword,
+ null, finalDomain);
+ authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider>create()
+ .register("ntlm", new NTLMSchemeFactory()).build();
+ }
+ provider.setCredentials(AuthScope.ANY, credentials);
+ builder.setDefaultCredentialsProvider(provider);
+ builder.setDefaultAuthSchemeRegistry(authSchemeRegistry);
}
@@ -350,7 +345,7 @@ public class HttpClientFactory {
private static class CustomRedirectStrategy extends LaxRedirectStrategy {
private static final Logger LOG = LoggerFactory.getLogger(CustomRedirectStrategy.class);
- private Set<String> allowedHosts;
+ private final Set<String> allowedHosts;
public CustomRedirectStrategy(Set<String> allowedHosts) {
this.allowedHosts = allowedHosts;
@@ -373,7 +368,9 @@ public class HttpClientFactory {
}
@Override
- public boolean isRedirected(HttpRequest request, HttpResponse response, HttpContext context) throws ProtocolException {
+ public boolean isRedirected(HttpRequest request, HttpResponse response,
+ HttpContext context)
+ throws ProtocolException {
boolean isRedirectedSuper = super.isRedirected(request, response, context);
if (isRedirectedSuper) {
Header locationHeader = response.getFirstHeader("Location");
@@ -422,9 +419,10 @@ public class HttpClientFactory {
try {
Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
cipher.init(Cipher.ENCRYPT_MODE, secretKey);
- return Base64.getEncoder()
- .encodeToString(cipher.doFinal(strToEncrypt.getBytes(StandardCharsets.UTF_8)));
- } catch (NoSuchAlgorithmException|InvalidKeyException|NoSuchPaddingException|BadPaddingException|IllegalBlockSizeException e) {
+ return Base64.getEncoder().encodeToString(
+ cipher.doFinal(strToEncrypt.getBytes(StandardCharsets.UTF_8)));
+ } catch (NoSuchAlgorithmException | InvalidKeyException |
+ NoSuchPaddingException | BadPaddingException | IllegalBlockSizeException e) {
throw new TikaConfigException("bad encryption info", e);
}
}
@@ -435,11 +433,8 @@ public class HttpClientFactory {
cipher.init(Cipher.DECRYPT_MODE, secretKey);
return new String(cipher.doFinal(Base64.getDecoder().decode(strToDecrypt)),
StandardCharsets.UTF_8);
- } catch (NoSuchAlgorithmException|
- InvalidKeyException|
- NoSuchPaddingException|
- BadPaddingException|
- IllegalBlockSizeException e) {
+ } catch (NoSuchAlgorithmException | InvalidKeyException |
+ NoSuchPaddingException | BadPaddingException | IllegalBlockSizeException e) {
throw new TikaConfigException("bad encryption info", e);
}
}
diff --git a/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientUtil.java b/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientUtil.java
index 11ebb3c..3de32da 100644
--- a/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientUtil.java
+++ b/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientUtil.java
@@ -16,20 +16,19 @@
*/
package org.apache.tika.client;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ByteArrayEntity;
-import org.apache.http.entity.InputStreamEntity;
import org.apache.http.util.EntityUtils;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-
public class HttpClientUtil {
- public static boolean postJson(HttpClient client, String url, String json) throws IOException,
- TikaClientException {
+ public static boolean postJson(HttpClient client, String url, String json)
+ throws IOException, TikaClientException {
HttpPost post = new HttpPost(url);
post.setHeader("Content-Encoding", "gzip");
ByteArrayEntity entity = new ByteArrayEntity(json.getBytes(StandardCharsets.UTF_8));
@@ -41,9 +40,8 @@ public class HttpClientUtil {
if (response.getStatusLine().getStatusCode() != 200) {
String msg = EntityUtils.toString(response.getEntity());
- throw new TikaClientException("Bad status: " +
- response.getStatusLine().getStatusCode() + " : "+
- msg);
+ throw new TikaClientException(
+ "Bad status: " + response.getStatusLine().getStatusCode() + " : " + msg);
} else {
String msg = EntityUtils.toString(response.getEntity());
System.out.println("httputil: " + msg);
@@ -51,9 +49,8 @@ public class HttpClientUtil {
return true;
}
- public static boolean postJson(HttpClient client, String url,
- byte[] bytes, boolean gzipped) throws IOException,
- TikaClientException {
+ public static boolean postJson(HttpClient client, String url, byte[] bytes, boolean gzipped)
+ throws IOException, TikaClientException {
HttpPost post = new HttpPost(url);
if (gzipped) {
post.setHeader("Content-Encoding", "gzip");
@@ -67,9 +64,8 @@ public class HttpClientUtil {
if (response.getStatusLine().getStatusCode() != 200) {
String msg = EntityUtils.toString(response.getEntity());
- throw new TikaClientException("Bad status: " +
- response.getStatusLine().getStatusCode() + " : "+
- msg);
+ throw new TikaClientException(
+ "Bad status: " + response.getStatusLine().getStatusCode() + " : " + msg);
} else {
String msg = EntityUtils.toString(response.getEntity());
System.out.println("httputil: " + msg);
diff --git a/tika-pipes/tika-pipes-async/pom.xml b/tika-pipes/tika-pipes-async/pom.xml
index ded8a0a..5f4e418 100644
--- a/tika-pipes/tika-pipes-async/pom.xml
+++ b/tika-pipes/tika-pipes-async/pom.xml
@@ -17,8 +17,8 @@
specific language governing permissions and limitations
under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.tika</groupId>
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncCli.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
index 4860299..cb8347f 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
@@ -1,16 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.tika.pipes.async;
-import org.apache.commons.io.FileUtils;
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.EmptyFetchIterator;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
import java.io.IOException;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
@@ -29,6 +36,16 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.EmptyFetchIterator;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.FetchIterator;
+
public class AsyncCli {
private static final Logger LOG = LoggerFactory.getLogger(AsyncCli.class);
@@ -37,7 +54,7 @@ public class AsyncCli {
Path configPath = Paths.get(args[0]);
int maxConsumers = 20;
AsyncCli asyncCli = new AsyncCli();
- Path dbDir = Paths.get("/Users/allison/Desktop/tmp-db");//Files.createTempDirectory("tika-async-db-");
+ Path dbDir = Files.createTempDirectory("tika-async-db-");
try {
asyncCli.execute(dbDir, configPath, maxConsumers);
} finally {
@@ -46,6 +63,18 @@ public class AsyncCli {
}
+ private static List<Integer> getActiveWorkers(Connection connection) throws SQLException {
+ PreparedStatement findActiveWorkers =
+ connection.prepareStatement("select worker_id from workers");
+ List<Integer> workers = new ArrayList<>();
+ try (ResultSet rs = findActiveWorkers.executeQuery()) {
+ while (rs.next()) {
+ workers.add(rs.getInt(1));
+ }
+ }
+ return workers;
+ }
+
private void execute(Path dbDir, Path configPath, int maxConsumers) throws Exception {
TikaConfig tikaConfig = new TikaConfig(configPath);
@@ -60,18 +89,19 @@ public class AsyncCli {
if (fetchIterator instanceof EmptyFetchIterator) {
throw new IllegalArgumentException("can't have empty fetch iterator");
}
- ArrayBlockingQueue<FetchEmitTuple> q = new ArrayBlockingQueue<>(10000);//fetchIterator.init(maxConsumers);
+ ArrayBlockingQueue<FetchEmitTuple> q =
+ new ArrayBlockingQueue<>(10000);//fetchIterator.init(maxConsumers);
AsyncTaskEnqueuer enqueuer = new AsyncTaskEnqueuer(q, connection);
executorCompletionService.submit(fetchIterator);
executorCompletionService.submit(enqueuer);
executorCompletionService.submit(new AssignmentManager(connection, enqueuer));
for (int i = 0; i < maxConsumers; i++) {
- executorCompletionService.submit(new AsyncWorker(connection,
- connectionString, i, configPath));
+ executorCompletionService
+ .submit(new AsyncWorker(connection, connectionString, i, configPath));
}
int completed = 0;
- while (completed < maxConsumers+3) {
+ while (completed < maxConsumers + 3) {
Future<Integer> future = executorCompletionService.take();
if (future != null) {
int val = future.get();
@@ -86,17 +116,13 @@ public class AsyncCli {
private String setupTables(Path dbDir) throws SQLException {
Path dbFile = dbDir.resolve("tika-async");
- String url = "jdbc:h2:file:" + dbFile.toAbsolutePath().toString() +
- ";AUTO_SERVER=TRUE";
+ String url = "jdbc:h2:file:" + dbFile.toAbsolutePath().toString() + ";AUTO_SERVER=TRUE";
Connection connection = DriverManager.getConnection(url);
- String sql = "create table task_queue " +
- "(id bigint auto_increment primary key," +
- "status tinyint," +//byte
- "worker_id integer," +
- "retry smallint," + //short
- "time_stamp timestamp," +
- "json varchar(64000))";
+ String sql = "create table task_queue " + "(id bigint auto_increment primary key," +
+ "status tinyint," + //byte
+ "worker_id integer," + "retry smallint," + //short
+ "time_stamp timestamp," + "json varchar(64000))";
connection.createStatement().execute(sql);
//no clear benefit to creating an index on timestamp
// sql = "CREATE INDEX IF NOT EXISTS status_timestamp on status (time_stamp)";
@@ -106,17 +132,13 @@ public class AsyncCli {
sql = "create table workers_shutdown (worker_id int primary key)";
connection.createStatement().execute(sql);
- sql = "create table error_log (task_id bigint, " +
- "fetch_key varchar(10000)," +
- "time_stamp timestamp," +
- "retry integer," +
- "error_code tinyint)";
+ sql = "create table error_log (task_id bigint, " + "fetch_key varchar(10000)," +
+ "time_stamp timestamp," + "retry integer," + "error_code tinyint)";
connection.createStatement().execute(sql);
return url;
}
-
//this reads fetchemittuples from the queue and inserts them in the db
//for the workers to read
private static class AsyncTaskEnqueuer implements Callable<Integer> {
@@ -128,8 +150,8 @@ public class AsyncCli {
private volatile boolean isComplete = false;
- AsyncTaskEnqueuer(ArrayBlockingQueue<FetchEmitTuple> queue,
- Connection connection) throws SQLException {
+ AsyncTaskEnqueuer(ArrayBlockingQueue<FetchEmitTuple> queue, Connection connection)
+ throws SQLException {
this.queue = queue;
this.connection = connection;
String sql = "insert into task_queue (status, time_stamp, worker_id, retry, json) " +
@@ -142,7 +164,7 @@ public class AsyncCli {
List<Integer> workers = new ArrayList<>();
while (true) {
FetchEmitTuple t = queue.poll(1, TimeUnit.SECONDS);
- LOG.debug("enqueing to db "+t);
+ LOG.debug("enqueing to db " + t);
if (t == null) {
//log.trace?
} else if (t == FetchIterator.COMPLETED_SEMAPHORE) {
@@ -155,7 +177,7 @@ public class AsyncCli {
while (workers.size() == 0 && elapsed < 600000) {
workers = getActiveWorkers(connection);
Thread.sleep(100);
- elapsed = System.currentTimeMillis()-start;
+ elapsed = System.currentTimeMillis() - start;
}
insert(t, workers);
}
@@ -165,7 +187,9 @@ public class AsyncCli {
boolean isComplete() {
return isComplete;
}
- private void insert(FetchEmitTuple t, List<Integer> workers) throws IOException, SQLException {
+
+ private void insert(FetchEmitTuple t, List<Integer> workers)
+ throws IOException, SQLException {
int workerId = workers.size() == 1 ? workers.get(0) :
workers.get(random.nextInt(workers.size()));
insert.clearParameters();
@@ -190,12 +214,12 @@ public class AsyncCli {
private final Random random = new Random();
- public AssignmentManager(Connection connection, AsyncTaskEnqueuer enqueuer) throws SQLException {
+ public AssignmentManager(Connection connection, AsyncTaskEnqueuer enqueuer)
+ throws SQLException {
this.connection = connection;
this.enqueuer = enqueuer;
//this gets workers and # of tasks in desc order of number of tasks
- String sql = "select w.worker_id, p.cnt " +
- "from workers w " +
+ String sql = "select w.worker_id, p.cnt " + "from workers w " +
"left join (select worker_id, count(1) as cnt from task_queue " +
"where status=0 group by worker_id)" +
" p on p.worker_id=w.worker_id order by p.cnt desc";
@@ -213,14 +237,12 @@ public class AsyncCli {
//current strategy reallocate tasks from longest queue to shortest
//TODO: might consider randomly shuffling or other algorithms
sql = "update task_queue set worker_id= ? where id in " +
- "(select id from task_queue where " +
- "worker_id = ? and " +
- "rand() < 0.8 " +
+ "(select id from task_queue where " + "worker_id = ? and " + "rand() < 0.8 " +
"and status=0 for update)";
reallocate = connection.prepareStatement(sql);
- sql = "select count(1) from task_queue where status="
- + AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal();
+ sql = "select count(1) from task_queue where status=" +
+ AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal();
countAvailableTasks = connection.prepareStatement(sql);
sql = "insert into workers_shutdown (worker_id) values (?)";
@@ -251,7 +273,7 @@ public class AsyncCli {
}
private boolean isComplete() throws SQLException {
- if (! enqueuer.isComplete) {
+ if (!enqueuer.isComplete) {
return false;
}
try (ResultSet rs = countAvailableTasks.executeQuery()) {
@@ -299,7 +321,8 @@ public class AsyncCli {
}
- private void reallocateFromMissingWorkers(List<Integer> missingWorkers) throws SQLException {
+ private void reallocateFromMissingWorkers(List<Integer> missingWorkers)
+ throws SQLException {
if (missingWorkers.size() == 0) {
return;
@@ -316,8 +339,7 @@ public class AsyncCli {
allocateNonworkersToWorkers.setInt(1, active);
allocateNonworkersToWorkers.setInt(2, missing);
allocateNonworkersToWorkers.execute();
- LOG.debug("allocating missing working ({}) to ({})",
- missing, active);
+ LOG.debug("allocating missing working ({}) to ({})", missing, active);
}
}
@@ -333,16 +355,4 @@ public class AsyncCli {
return missingWorkers;
}
}
-
- private static List<Integer> getActiveWorkers(Connection connection) throws SQLException {
- PreparedStatement findActiveWorkers = connection.prepareStatement(
- "select worker_id from workers");
- List<Integer> workers = new ArrayList<>();
- try (ResultSet rs = findActiveWorkers.executeQuery()) {
- while (rs.next()) {
- workers.add(rs.getInt(1));
- }
- }
- return workers;
- }
}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
index a86f563..33964b3 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
@@ -1,38 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.tika.pipes.async;
-import org.apache.tika.utils.StringUtils;
-
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import org.apache.tika.utils.StringUtils;
+
public class AsyncConfig {
+ private final int queueSize = 1000;
+ private final int numWorkers = 10;
+ private final int numEmitters = 1;
+ private String jdbcString;
+ private Path dbDir;
+
public static AsyncConfig load(Path p) throws IOException {
AsyncConfig asyncConfig = new AsyncConfig();
if (StringUtils.isBlank(asyncConfig.getJdbcString())) {
asyncConfig.dbDir = Files.createTempDirectory("tika-async-");
Path dbFile = asyncConfig.dbDir.resolve("tika-async");
- asyncConfig.setJdbcString("jdbc:h2:file:" + dbFile.toAbsolutePath().toString() +
- ";AUTO_SERVER=TRUE");
+ asyncConfig.setJdbcString(
+ "jdbc:h2:file:" + dbFile.toAbsolutePath().toString() + ";AUTO_SERVER=TRUE");
} else {
asyncConfig.dbDir = null;
}
return asyncConfig;
}
- private int queueSize = 1000;
- private int maxConsumers = 10;
- private String jdbcString;
- private Path dbDir;
-
public int getQueueSize() {
return queueSize;
}
- public int getMaxConsumers() {
- return maxConsumers;
+ public int getNumWorkers() {
+ return numWorkers;
+ }
+
+ public int getNumEmitters() {
+ return numEmitters;
}
public String getJdbcString() {
@@ -46,6 +67,7 @@ public class AsyncConfig {
/**
* If no jdbc connection was specified, this
* dir contains the h2 database. Otherwise, null.
+ *
* @return
*/
public Path getTempDBDir() {
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncData.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncData.java
index d7e058d..df80929 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncData.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncData.java
@@ -1,20 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.tika.pipes.async;
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
import org.apache.tika.metadata.Metadata;
import org.apache.tika.pipes.emitter.EmitData;
-
-import java.util.List;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
public class AsyncData extends EmitData {
- private final AsyncTask asyncTask;
+ private final long taskId;
+ private final FetchKey fetchKey;
+ private final FetchEmitTuple.ON_PARSE_EXCEPTION onParseException;
+
+ public AsyncData(@JsonProperty("taskId") long taskId,
+ @JsonProperty("fetchKey") FetchKey fetchKey,
+ @JsonProperty("emitKey") EmitKey emitKey, @JsonProperty("onParseException")
+ FetchEmitTuple.ON_PARSE_EXCEPTION onParseException,
+ @JsonProperty("metadataList") List<Metadata> metadataList) {
+ super(emitKey, metadataList);
+ this.taskId = taskId;
+ this.fetchKey = fetchKey;
+ this.onParseException = onParseException;
+ }
+
+ public FetchKey getFetchKey() {
+ return fetchKey;
+ }
- public AsyncData(AsyncTask asyncTask, List<Metadata> metadataList) {
- super(asyncTask.getEmitKey(), metadataList);
- this.asyncTask = asyncTask;
+ public long getTaskId() {
+ return taskId;
}
- public AsyncTask getAsyncTask() {
- return asyncTask;
+ public FetchEmitTuple.ON_PARSE_EXCEPTION getOnParseException() {
+ return onParseException;
}
}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java
index 02d7fec..502c8b4 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.tika.pipes.async;
public interface AsyncEmitHook {
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
index 071a1fa..4c44f9c 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
@@ -17,38 +17,19 @@
package org.apache.tika.pipes.async;
-import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.pipes.emitter.AbstractEmitter;
-import org.apache.tika.pipes.emitter.EmitData;
-import org.apache.tika.pipes.emitter.Emitter;
-import org.apache.tika.pipes.emitter.EmitterManager;
-import org.apache.tika.pipes.emitter.TikaEmitterException;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.utils.ExceptionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY;
+import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_JDBC_KEY;
import java.io.IOException;
-import java.io.StringReader;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.PreparedStatement;
-import java.sql.ResultSet;
import java.sql.SQLException;
-import java.time.Instant;
-import java.time.temporal.ChronoUnit;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
-import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY;
-import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_JDBC_KEY;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class AsyncEmitter implements Callable<Integer> {
@@ -56,42 +37,27 @@ public class AsyncEmitter implements Callable<Integer> {
private final String connectionString;
- private final int workerId;
+ private final int emitterId;
private final Path tikaConfigPath;
private final Connection connection;
private final PreparedStatement finished;
private final PreparedStatement restarting;
- private final PreparedStatement selectActiveTasks;
- private final PreparedStatement insertErrorLog;
- private final PreparedStatement resetStatus;
- public AsyncEmitter(Connection connection,
- String connectionString, int workerId,
- Path tikaConfigPath) throws SQLException {
+ public AsyncEmitter(Connection connection, String connectionString, int emitterId,
+ Path tikaConfigPath) throws SQLException {
this.connectionString = connectionString;
- this.workerId = workerId;
+ this.emitterId = emitterId;
this.tikaConfigPath = tikaConfigPath;
this.connection = connection;
- String sql = "update workers set status="+
- AsyncWorkerProcess.WORKER_STATUS_CODES.SHUTDOWN.ordinal()+
- " where worker_id = (" + workerId + ")";
+ String sql = "update emitters set status=" +
+ AsyncWorkerProcess.WORKER_STATUS_CODES.HAS_SHUTDOWN.ordinal() +
+ " where emitter_id = (" + emitterId + ")";
finished = connection.prepareStatement(sql);
- sql = "update workers set status="+
- AsyncWorkerProcess.WORKER_STATUS_CODES.RESTARTING.ordinal()+
- " where worker_id = (" + workerId + ")";
+ sql = "update emitters set status=" +
+ AsyncWorkerProcess.WORKER_STATUS_CODES.RESTARTING.ordinal() +
+ " where emitter_id = (" + emitterId + ")";
restarting = connection.prepareStatement(sql);
- //this checks if the process was able to reset the status
- sql = "select id, retry, json from task_queue where worker_id="
- + workerId +
- " and status=" + AsyncWorkerProcess.TASK_STATUS_CODES.IN_PROCESS.ordinal();
- selectActiveTasks = connection.prepareStatement(sql);
-
- //if not, this is called to insert into the error log
- insertErrorLog = prepareInsertErrorLog(connection);
-
- //and this is called to reset the status on error
- resetStatus = prepareReset(connection);
}
@Override
@@ -105,7 +71,7 @@ public class AsyncEmitter implements Callable<Integer> {
if (finished) {
int exitValue = p.exitValue();
if (exitValue == 0) {
- LOG.info("forked emitter process finished with exitValue=0");
+ LOG.debug("forked emitter process finished with exitValue=0");
return 1;
}
reportCrash(++restarts, exitValue);
@@ -121,72 +87,24 @@ public class AsyncEmitter implements Callable<Integer> {
}
private Process start() throws IOException {
- String[] args = new String[]{
- "java", "-Djava.awt.headless=true",
- "-cp", System.getProperty("java.class.path"),
- "org.apache.tika.pipes.async.AsyncEmitterProcess",
- Integer.toString(workerId)
- };
+ String[] args = new String[]{"java", "-Djava.awt.headless=true", "-cp",
+ System.getProperty("java.class.path"),
+ "org.apache.tika.pipes.async.AsyncEmitterProcess", Integer.toString(emitterId)};
ProcessBuilder pb = new ProcessBuilder(args);
pb.environment().put(TIKA_ASYNC_JDBC_KEY, connectionString);
- pb.environment().put(TIKA_ASYNC_CONFIG_FILE_KEY,
- tikaConfigPath.toAbsolutePath().toString());
+ pb.environment()
+ .put(TIKA_ASYNC_CONFIG_FILE_KEY, tikaConfigPath.toAbsolutePath().toString());
pb.inheritIO();
return pb.start();
}
private void reportCrash(int numRestarts, int exitValue) throws SQLException, IOException {
- LOG.warn("worker id={} terminated, exitValue={}",
- workerId, exitValue);
+ LOG.warn("emitter id={} terminated, exitValue={}", emitterId, exitValue);
restarting.execute();
- List<AsyncTask> activeTasks = new ArrayList<>();
- try (ResultSet rs = selectActiveTasks.executeQuery()) {
- long taskId = rs.getLong(1);
- short retry = rs.getShort(2);
- String json = rs.getString(3);
- FetchEmitTuple tuple = JsonFetchEmitTuple.fromJson(new StringReader(json));
- activeTasks.add(new AsyncTask(taskId, retry, tuple));
- }
- if (activeTasks.size() == 0) {
- LOG.info("worker reset active tasks, nothing extra to report");
- return;
- }
- if (activeTasks.size() > 1) {
- LOG.warn("more than one active task? this should never happen!");
- }
-
- for (AsyncTask t : activeTasks) {
- reportAndReset(t, AsyncWorkerProcess.ERROR_CODES.UNKNOWN_PARSE,
- insertErrorLog, resetStatus, LOG);
- }
-
- }
-
- static void reportAndReset(AsyncTask task, AsyncWorkerProcess.ERROR_CODES errorCode,
- PreparedStatement insertErrorLog, PreparedStatement resetStatus,
- Logger logger) {
- try {
- insertErrorLog.clearParameters();
- insertErrorLog.setLong(1, task.getTaskId());
- insertErrorLog.setString(2, task.getFetchKey().getKey());
- insertErrorLog.setInt(3, task.getRetry());
- insertErrorLog.setByte(4, (byte) errorCode.ordinal());
- insertErrorLog.execute();
- } catch (SQLException e) {
- logger.error("Can't update error log", e);
- }
-
- try {
- resetStatus.clearParameters();
- resetStatus.setByte(1, (byte) AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal());
- resetStatus.setShort(2, (short)(task.getRetry()+1));
- resetStatus.setLong(3, task.getTaskId());
- resetStatus.execute();
- } catch (SQLException e) {
- logger.error("Can't reset try status", e);
- }
+ //should we unassign emit tasks here?
}
+ /*
static PreparedStatement prepareInsertErrorLog(Connection connection) throws SQLException {
//if not, this is called to insert into the error log
return connection.prepareStatement(
@@ -203,5 +121,5 @@ public class AsyncEmitter implements Callable<Integer> {
"time_stamp=CURRENT_TIMESTAMP(), " +
"retry=? " +
"where id=?");
- }
+ }*/
}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
index 37c5f3a..03f5b93 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
@@ -16,21 +16,9 @@
*/
package org.apache.tika.pipes.async;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import net.jpountz.lz4.LZ4Factory;
-import net.jpountz.lz4.LZ4FastDecompressor;
-import org.apache.commons.io.IOUtils;
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.pipes.emitter.AbstractEmitter;
-import org.apache.tika.pipes.emitter.Emitter;
-import org.apache.tika.pipes.emitter.EmitterManager;
-import org.apache.tika.pipes.emitter.TikaEmitterException;
-import org.apache.tika.utils.ExceptionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.file.Paths;
import java.sql.Blob;
import java.sql.Connection;
@@ -42,95 +30,184 @@ import java.sql.Timestamp;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.serialization.JsonMetadataDeserializer;
+import org.apache.tika.pipes.emitter.AbstractEmitter;
+import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.EmitterManager;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
+import org.apache.tika.utils.ExceptionUtils;
public class AsyncEmitterProcess {
- //TODO -- parameterize these
- private long emitWithinMs = 10000;
- private long emitMaxBytes = 10_000_000;
private static final Logger LOG = LoggerFactory.getLogger(AsyncEmitterProcess.class);
-
- private final LZ4FastDecompressor decompressor = LZ4Factory.fastestInstance().fastDecompressor();
+ private final LZ4FastDecompressor decompressor =
+ LZ4Factory.fastestInstance().fastDecompressor();
private final ObjectMapper objectMapper = new ObjectMapper();
+ private final FutureTask<Integer> stdinWatcher;
int recordsPerPulse = 10;
+ //TODO -- parameterize these
+ private final long emitWithinMs = 10000;
+ private final long emitMaxBytes = 10_000_000;
private PreparedStatement markForSelecting;
private PreparedStatement selectForProcessing;
private PreparedStatement emitStatusUpdate;
- private PreparedStatement checkForShutdown;
+ private PreparedStatement checkForCanShutdown;
+ private PreparedStatement checkForShouldShutdown;
+ private PreparedStatement updateEmitterStatus;
+
+ private AsyncEmitterProcess(InputStream stdin) {
+ SimpleModule module = new SimpleModule();
+ module.addDeserializer(Metadata.class, new JsonMetadataDeserializer());
+ objectMapper.registerModule(module);
+ stdinWatcher = new FutureTask<>(new ForkWatcher(stdin));
+ new Thread(stdinWatcher).start();
+ }
public static void main(String[] args) throws Exception {
String db = System.getenv(AsyncProcessor.TIKA_ASYNC_JDBC_KEY);
- TikaConfig tikaConfig = new TikaConfig(Paths.get(System.getenv(AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY)));
+ TikaConfig tikaConfig =
+ new TikaConfig(Paths.get(System.getenv(AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY)));
int workerId = Integer.parseInt(args[0]);
LOG.debug("trying to get connection {} >{}<", workerId, db);
try (Connection connection = DriverManager.getConnection(db)) {
- AsyncEmitterProcess asyncEmitter = new AsyncEmitterProcess();
+ AsyncEmitterProcess asyncEmitter = new AsyncEmitterProcess(System.in);
asyncEmitter.execute(connection, workerId, tikaConfig);
}
System.exit(0);
}
- private void execute(Connection connection, int workerId,
- TikaConfig tikaConfig) throws SQLException,
- InterruptedException {
+ private static void reportEmitStatus(List<Long> ids,
+ AsyncWorkerProcess.TASK_STATUS_CODES emitted,
+ PreparedStatement emitStatusUpdate) throws SQLException {
+ for (long id : ids) {
+ emitStatusUpdate.clearParameters();
+ emitStatusUpdate.setByte(1, (byte) emitted.ordinal());
+ emitStatusUpdate.setLong(2, id);
+ emitStatusUpdate.addBatch();
+ }
+ emitStatusUpdate.executeBatch();
+ }
+
+ private void execute(Connection connection, int workerId, TikaConfig tikaConfig)
+ throws SQLException, InterruptedException {
prepareStatements(connection, workerId);
+ updateStatus((byte) AsyncWorkerProcess.WORKER_STATUS_CODES.ACTIVE.ordinal());
EmitterManager emitterManager = tikaConfig.getEmitterManager();
- EmitDataCache emitDataCache = new EmitDataCache(emitterManager, emitMaxBytes,
- emitStatusUpdate);
- int shouldShutdown = 0;
+ EmitDataCache emitDataCache =
+ new EmitDataCache(emitterManager, emitMaxBytes, emitStatusUpdate);
+ try {
+ mainLoop(emitDataCache);
+ } finally {
+ emitDataCache.emitAll();
+ updateStatus((byte) AsyncWorkerProcess.WORKER_STATUS_CODES.HAS_SHUTDOWN.ordinal());
+ }
+ }
+
+ private void mainLoop(EmitDataCache emitDataCache) throws InterruptedException, SQLException {
+
while (true) {
+ if (shouldShutdown()) {
+ LOG.debug("received should shutdown signal");
+ return;
+ }
int toEmit = markForSelecting.executeUpdate();
+ if (toEmit == 0 && canShutdown()) {
+ //avoid race condition; double check there's nothing
+ //left to emit
+ toEmit = markForSelecting.executeUpdate();
+ if (toEmit == 0) {
+ LOG.debug("received can shutdown and didn't update any for selecting");
+ return;
+ }
+ }
if (toEmit > 0) {
- try (ResultSet rs = selectForProcessing.executeQuery()) {
- while (rs.next()) {
- long id = rs.getLong(1);
- Timestamp ts = rs.getTimestamp(2);
- int uncompressedSize = rs.getInt(3);
- Blob blob = rs.getBlob(4);
- try {
- tryToEmit(id, ts, uncompressedSize, blob,
- emitDataCache);
- } catch (SQLException|IOException e) {
- reportEmitStatus(
- Collections.singletonList(id),
- AsyncWorkerProcess.TASK_STATUS_CODES.FAILED_EMIT,
- emitStatusUpdate
- );
- }
- }
+ try {
+ tryToEmitNextBatch(emitDataCache);
+ } catch (IOException e) {
+ LOG.warn("IOException trying to emit", e);
}
}
if (emitDataCache.exceedsEmitWithin(emitWithinMs)) {
emitDataCache.emitAll();
}
Thread.sleep(500);
- if (shouldShutdown()) {
- shouldShutdown++;
+ }
+ }
+
+ private void tryToEmitNextBatch(EmitDataCache emitDataCache) throws IOException, SQLException {
+ List<AsyncEmitTuple> toEmitList = new ArrayList<>();
+ try (ResultSet rs = selectForProcessing.executeQuery()) {
+ while (rs.next()) {
+ long id = rs.getLong(1);
+ Timestamp ts = rs.getTimestamp(2);
+ int uncompressedSize = rs.getInt(3);
+ Blob blob = rs.getBlob(4);
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ IOUtils.copyLarge(blob.getBinaryStream(), bos);
+ byte[] bytes = bos.toByteArray();
+ toEmitList.add(new AsyncEmitTuple(id, ts, uncompressedSize, bytes));
}
- //make sure to test twice
- if (shouldShutdown > 1) {
- emitDataCache.emitAll();
- return;
+ }
+ List<Long> successes = new ArrayList<>();
+ List<Long> exceptions = new ArrayList<>();
+ for (AsyncEmitTuple tuple : toEmitList) {
+ try {
+ tryToEmit(tuple, emitDataCache);
+ successes.add(tuple.id);
+ } catch (IOException | SQLException e) {
+ exceptions.add(tuple.id);
}
}
+ reportEmitStatus(successes, AsyncWorkerProcess.TASK_STATUS_CODES.IN_PROCESS_EMIT,
+ emitStatusUpdate);
+ reportEmitStatus(exceptions, AsyncWorkerProcess.TASK_STATUS_CODES.FAILED_EMIT,
+ emitStatusUpdate);
+
+ }
+
+ private void updateStatus(byte status) throws SQLException {
+ updateEmitterStatus.clearParameters();
+ updateEmitterStatus.setByte(1, status);
+ updateEmitterStatus.executeUpdate();
}
- private void tryToEmit(long id, Timestamp ts,
- int decompressedLength,
- Blob blob, EmitDataCache emitDataCache)
+ private void tryToEmit(AsyncEmitTuple asyncEmitTuple, EmitDataCache emitDataCache)
throws SQLException, IOException {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- IOUtils.copyLarge(blob.getBinaryStream(), bos);
- AsyncData asyncData = deserialize(bos.toByteArray(), decompressedLength);
+ AsyncData asyncData = deserialize(asyncEmitTuple.bytes, asyncEmitTuple.uncompressedSize);
emitDataCache.add(asyncData);
}
boolean shouldShutdown() throws SQLException {
- try (ResultSet rs = checkForShutdown.executeQuery()) {
+ if (stdinWatcher.isDone()) {
+ LOG.info("parent inputstream closed; shutting down now");
+ }
+ try (ResultSet rs = checkForShouldShutdown.executeQuery()) {
+ if (rs.next()) {
+ int val = rs.getInt(1);
+ return val > 0;
+ }
+ }
+ return false;
+ }
+
+ boolean canShutdown() throws SQLException {
+ try (ResultSet rs = checkForCanShutdown.executeQuery()) {
if (rs.next()) {
int val = rs.getInt(1);
return val > 0;
@@ -141,54 +218,57 @@ public class AsyncEmitterProcess {
private void prepareStatements(Connection connection, int workerId) throws SQLException {
String sql = "update task_queue set status=" +
- AsyncWorkerProcess.TASK_STATUS_CODES.SELECTED_EMIT.ordinal()+
- ", worker_id="+workerId+", time_stamp=CURRENT_TIMESTAMP()"+
- " where id in " +
- " (select id from task_queue "+//where worker_id = " + workerId +
- " where status="+ AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE_EMIT.ordinal()+
- " order by time_stamp asc limit "+recordsPerPulse+" for update)";
+ AsyncWorkerProcess.TASK_STATUS_CODES.SELECTED_EMIT.ordinal() + ", worker_id=" +
+ workerId + ", time_stamp=CURRENT_TIMESTAMP()" + " where id in " +
+ " (select id from task_queue " + //where worker_id = " + workerId +
+ " where status=" + AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE_EMIT.ordinal() +
+ " order by time_stamp asc limit " + recordsPerPulse + " for update)";
markForSelecting = connection.prepareStatement(sql);
- sql = "select q.id, q.time_stamp, uncompressed_size, bytes from emits e " +
- "join task_queue q " +
- "where q.status=" +
- AsyncWorkerProcess.TASK_STATUS_CODES.SELECTED_EMIT.ordinal() +
- " and worker_id=" + workerId +
- " order by time_stamp asc";
+ sql = "select q.id, q.time_stamp, uncompressed_size, bytes " + "from emits e " +
+ "join task_queue q on e.id=q.id " + "where q.status=" +
+ AsyncWorkerProcess.TASK_STATUS_CODES.SELECTED_EMIT.ordinal() + " and worker_id=" +
+ workerId + " order by time_stamp asc";
selectForProcessing = connection.prepareStatement(sql);
- sql = "update task_queue set status=?"+
- ", time_stamp=CURRENT_TIMESTAMP()"+
- " where id=?";
+ //only update the status if it is not already emitted or failed emit
+ sql = "update task_queue set status=?" + ", time_stamp=CURRENT_TIMESTAMP()" +
+ " where id=? and status not in (" +
+ AsyncWorkerProcess.TASK_STATUS_CODES.EMITTED.ordinal() + ", " +
+ AsyncWorkerProcess.TASK_STATUS_CODES.FAILED_EMIT.ordinal() + ")";
+
emitStatusUpdate = connection.prepareStatement(sql);
- sql = "select count(1) from workers where worker_id=" + workerId +
- " and status="+ AsyncWorkerProcess.WORKER_STATUS_CODES.SHOULD_SHUTDOWN.ordinal();
- checkForShutdown = connection.prepareStatement(sql);
- }
+ sql = "select count(1) from emitters where emitter_id=" + workerId + " and status=" +
+ AsyncWorkerProcess.WORKER_STATUS_CODES.CAN_SHUTDOWN.ordinal();
+ checkForCanShutdown = connection.prepareStatement(sql);
+ sql = "select count(1) from emitters where emitter_id=" + workerId + " and status=" +
+ AsyncWorkerProcess.WORKER_STATUS_CODES.SHOULD_SHUTDOWN.ordinal();
+ checkForShouldShutdown = connection.prepareStatement(sql);
- private AsyncData deserialize(byte[] compressed, int decompressedLength)
- throws IOException {
- byte[] restored = new byte[decompressedLength];
- int compressedLength2 = decompressor.decompress(compressed, 0, restored,
- 0, decompressedLength);
+ sql = "merge into emitters key (emitter_id) " + "values (" + workerId + ", ? )";
+ updateEmitterStatus = connection.prepareStatement(sql);
+ }
- return objectMapper.readerFor(AsyncTask.class).readValue(restored);
+ private AsyncData deserialize(byte[] compressed, int decompressedLength) throws IOException {
+ byte[] restored = new byte[decompressedLength];
+ int compressedLength2 =
+ decompressor.decompress(compressed, 0, restored, 0, decompressedLength);
+ return objectMapper.readerFor(AsyncData.class).readValue(restored);
}
private static class EmitDataCache {
private final EmitterManager emitterManager;
private final long maxBytes;
private final PreparedStatement emitStatusUpdate;
- private Instant lastAdded = Instant.now();
-
long estimatedSize = 0;
int size = 0;
Map<String, List<AsyncData>> map = new HashMap<>();
+ private Instant lastAdded = Instant.now();
- public EmitDataCache(EmitterManager emitterManager,
- long maxBytes, PreparedStatement emitStatusUpdate) {
+ public EmitDataCache(EmitterManager emitterManager, long maxBytes,
+ PreparedStatement emitStatusUpdate) {
this.emitterManager = emitterManager;
this.maxBytes = maxBytes;
this.emitStatusUpdate = emitStatusUpdate;
@@ -201,10 +281,11 @@ public class AsyncEmitterProcess {
void add(AsyncData data) {
size++;
- long sz = AbstractEmitter.estimateSizeInBytes(data.getEmitKey().getEmitKey(), data.getMetadataList());
+ long sz = AbstractEmitter
+ .estimateSizeInBytes(data.getEmitKey().getEmitKey(), data.getMetadataList());
if (estimatedSize + sz > maxBytes) {
LOG.debug("estimated size ({}) > maxBytes({}), going to emitAll",
- (estimatedSize+sz), maxBytes);
+ (estimatedSize + sz), maxBytes);
emitAll();
}
List<AsyncData> cached = map.get(data.getEmitKey().getEmitterName());
@@ -240,7 +321,7 @@ public class AsyncEmitterProcess {
throws SQLException {
List<Long> ids = new ArrayList<>();
for (AsyncData d : cachedEmitData) {
- ids.add(d.getAsyncTask().getTaskId());
+ ids.add(d.getTaskId());
}
try {
emitter.emit(cachedEmitData);
@@ -250,27 +331,49 @@ public class AsyncEmitterProcess {
reportEmitStatus(ids, AsyncWorkerProcess.TASK_STATUS_CODES.FAILED_EMIT,
emitStatusUpdate);
}
- reportEmitStatus(ids, AsyncWorkerProcess.TASK_STATUS_CODES.EMITTED,
- emitStatusUpdate);
+ reportEmitStatus(ids, AsyncWorkerProcess.TASK_STATUS_CODES.EMITTED, emitStatusUpdate);
return 1;
}
public boolean exceedsEmitWithin(long emitWithinMs) {
- return ChronoUnit.MILLIS.between(lastAdded, Instant.now())
- > emitWithinMs;
+ return ChronoUnit.MILLIS.between(lastAdded, Instant.now()) > emitWithinMs;
}
}
- private static void reportEmitStatus(List<Long> ids,
- AsyncWorkerProcess.TASK_STATUS_CODES emitted,
- PreparedStatement emitStatusUpdate)
- throws SQLException {
- for (long id : ids) {
- emitStatusUpdate.clearParameters();
- emitStatusUpdate.setByte(1, (byte)emitted.ordinal());
- emitStatusUpdate.setLong(2, id);
- emitStatusUpdate.executeUpdate();
+ private static class ForkWatcher implements Callable<Integer> {
+ private final InputStream in;
+
+ public ForkWatcher(InputStream in) {
+ this.in = in;
+ }
+
+ @Override
+ public Integer call() throws Exception {
+ //this should block forever
+ //if the forking process dies,
+ // this will either throw an IOException or read -1.
+ try {
+ int i = in.read();
+ } finally {
+ LOG.warn("forking process shutdown; exiting now");
+ System.exit(0);
+ }
+ return 1;
+ }
+ }
+
+ private static class AsyncEmitTuple {
+ final long id;
+ final Timestamp timestamp;
+ final int uncompressedSize;
+ final byte[] bytes;
+
+ public AsyncEmitTuple(long id, Timestamp timestamp, int uncompressedSize, byte[] bytes) {
+ this.id = id;
+ this.timestamp = timestamp;
+ this.uncompressedSize = uncompressedSize;
+ this.bytes = bytes;
}
}
}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java
index 3828b24..79ed80d 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java
@@ -1,12 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.tika.pipes.async;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class AsyncPipesEmitHook implements AsyncEmitHook {
private static final Logger LOG = LoggerFactory.getLogger(AsyncPipesEmitHook.class);
@@ -14,7 +30,7 @@ public class AsyncPipesEmitHook implements AsyncEmitHook {
private final PreparedStatement markSuccess;
private final PreparedStatement markFailure;
- public AsyncPipesEmitHook(Connection connection) throws SQLException {
+ public AsyncPipesEmitHook(Connection connection) throws SQLException {
String sql = "delete from task_queue where id=?";
markSuccess = connection.prepareStatement(sql);
//TODO --fix this
@@ -28,7 +44,7 @@ public class AsyncPipesEmitHook implements AsyncEmitHook {
markSuccess.setLong(1, task.getTaskId());
markSuccess.execute();
} catch (SQLException e) {
- LOG.warn("problem with on success: "+task.getTaskId(), e);
+ LOG.warn("problem with on success: " + task.getTaskId(), e);
}
}
@@ -39,7 +55,7 @@ public class AsyncPipesEmitHook implements AsyncEmitHook {
markFailure.setLong(1, task.getTaskId());
markFailure.execute();
} catch (SQLException e) {
- LOG.warn("problem with on fail: "+task.getTaskId(), e);
+ LOG.warn("problem with on fail: " + task.getTaskId(), e);
}
}
}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index b4d8a8a..efd6fec 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -1,14 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.tika.pipes.async;
-import org.apache.commons.io.FileUtils;
-import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.Closeable;
import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -17,8 +28,10 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -28,44 +41,69 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.FetchIterator;
+
public class AsyncProcessor implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(AsyncProcessor.class);
protected static String TIKA_ASYNC_JDBC_KEY = "TIKA_ASYC_JDBC_KEY";
protected static String TIKA_ASYNC_CONFIG_FILE_KEY = "TIKA_ASYNC_CONFIG_FILE_KEY";
private final Path tikaConfigPath;
- private AsyncConfig asyncConfig;
private final ArrayBlockingQueue<FetchEmitTuple> queue;
private final Connection connection;
- private int finishedThreads = 0;
private final int totalThreads;
+ private final AsyncConfig asyncConfig;
+ private PreparedStatement emittersCanShutdown;
+ private volatile boolean isShuttingDown = false;
+ private AssignmentManager assignmentManager;
+ private int finishedThreads = 0;
private ExecutorService executorService;
private ExecutorCompletionService<Integer> executorCompletionService;
+ private AsyncProcessor(Path tikaConfigPath) throws SQLException, IOException {
+ this.tikaConfigPath = tikaConfigPath;
+ this.asyncConfig = AsyncConfig.load(tikaConfigPath);
+ this.queue = new ArrayBlockingQueue<>(asyncConfig.getQueueSize());
+ this.connection = DriverManager.getConnection(asyncConfig.getJdbcString());
+ this.totalThreads = asyncConfig.getNumWorkers() + asyncConfig.getNumEmitters() +
+ 2;//assignment manager and enqueuer threads
+ }
+
public static AsyncProcessor build(Path tikaConfigPath) throws AsyncRuntimeException {
try {
AsyncProcessor processor = new AsyncProcessor(tikaConfigPath);
-
processor.init();
return processor;
- } catch (SQLException|IOException e) {
+ } catch (SQLException | IOException e) {
throw new AsyncRuntimeException(e);
}
}
- private AsyncProcessor(Path tikaConfigPath) throws SQLException, IOException {
- this.tikaConfigPath = tikaConfigPath;
- this.asyncConfig = AsyncConfig.load(tikaConfigPath);
- this.queue = new ArrayBlockingQueue<>(asyncConfig.getQueueSize());
- this.connection = DriverManager.getConnection(asyncConfig.getJdbcString());
- this.totalThreads = asyncConfig.getMaxConsumers() + 2 + 1;
+ private static List<Integer> getActiveWorkers(Connection connection) throws SQLException {
+ PreparedStatement findActiveWorkers =
+ connection.prepareStatement("select worker_id from workers");
+ List<Integer> workers = new ArrayList<>();
+ try (ResultSet rs = findActiveWorkers.executeQuery()) {
+ while (rs.next()) {
+ workers.add(rs.getInt(1));
+ }
+ }
+ return workers;
}
- public synchronized boolean offer (
- List<FetchEmitTuple> fetchEmitTuples, long offerMs) throws
- AsyncRuntimeException, InterruptedException {
+ public synchronized boolean offer(List<FetchEmitTuple> fetchEmitTuples, long offerMs)
+ throws AsyncRuntimeException, InterruptedException {
if (queue == null) {
throw new IllegalStateException("queue hasn't been initialized yet.");
+ } else if (isShuttingDown) {
+ throw new IllegalStateException(
+ "Can't call offer after calling close() or " + "shutdownNow()");
}
long start = System.currentTimeMillis();
long elapsed = System.currentTimeMillis() - start;
@@ -87,6 +125,12 @@ public class AsyncProcessor implements Closeable {
public synchronized boolean offer(FetchEmitTuple t, long offerMs)
throws AsyncRuntimeException, InterruptedException {
+ if (queue == null) {
+ throw new IllegalStateException("queue hasn't been initialized yet.");
+ } else if (isShuttingDown) {
+ throw new IllegalStateException(
+ "Can't call offer after calling close() or " + "shutdownNow()");
+ }
checkActive();
return queue.offer(t, offerMs, TimeUnit.MILLISECONDS);
}
@@ -99,8 +143,7 @@ public class AsyncProcessor implements Closeable {
* @throws AsyncRuntimeException
* @throws InterruptedException
*/
- public synchronized boolean checkActive()
- throws AsyncRuntimeException, InterruptedException {
+ public synchronized boolean checkActive() throws AsyncRuntimeException, InterruptedException {
Future<Integer> future = executorCompletionService.poll();
if (future != null) {
try {
@@ -110,74 +153,87 @@ public class AsyncProcessor implements Closeable {
}
finishedThreads++;
}
- if (finishedThreads == totalThreads) {
- return false;
- }
- return true;
+ return finishedThreads != totalThreads;
}
private void init() throws SQLException {
setupTables();
+ String sql = "update emitters set status=" +
+ AsyncWorkerProcess.WORKER_STATUS_CODES.CAN_SHUTDOWN.ordinal();
+ this.emittersCanShutdown = connection.prepareStatement(sql);
+ executorService = Executors.newFixedThreadPool(totalThreads);
+ executorCompletionService = new ExecutorCompletionService<>(executorService);
- executorService = Executors.newFixedThreadPool(
- totalThreads);
- executorCompletionService =
- new ExecutorCompletionService<>(executorService);
+ AsyncTaskEnqueuer taskEnqueuer = new AsyncTaskEnqueuer(queue, connection);
- AsyncTaskEnqueuer enqueuer = new AsyncTaskEnqueuer(queue, connection);
+ executorCompletionService.submit(taskEnqueuer);
- executorCompletionService.submit(enqueuer);
- executorCompletionService.submit(new AssignmentManager(connection, enqueuer));
- //executorCompletionService.submit(new )
- for (int i = 0; i < asyncConfig.getMaxConsumers(); i++) {
- executorCompletionService.submit(new AsyncWorker(connection,
- asyncConfig.getJdbcString(), i, tikaConfigPath));
+ List<AsyncWorker> workers = buildWorkers(connection, asyncConfig, tikaConfigPath);
+ int maxRetries = 0;
+ for (AsyncWorker worker : workers) {
+ if (worker.getMaxRetries() > maxRetries) {
+ maxRetries = worker.getMaxRetries();
+ }
+ executorCompletionService.submit(worker);
+ }
+ assignmentManager = new AssignmentManager(connection, taskEnqueuer, maxRetries);
+ executorCompletionService.submit(assignmentManager);
+ for (int i = 0; i < asyncConfig.getNumEmitters(); i++) {
+ executorCompletionService
+ .submit(new AsyncEmitter(connection, asyncConfig.getJdbcString(),
+ asyncConfig.getNumWorkers() + i, tikaConfigPath));
}
- executorCompletionService.submit(new AsyncEmitter(connection,
- asyncConfig.getJdbcString(), asyncConfig.getMaxConsumers(),
- tikaConfigPath));
+ }
+
+ private List<AsyncWorker> buildWorkers(Connection connection, AsyncConfig asyncConfig,
+ Path tikaConfigPath) throws SQLException {
+ //TODO -- make these workers configurable via the tika config, e.g. max retries
+ //and jvm args, etc.
+ List<AsyncWorker> workers = new ArrayList<>();
+ for (int i = 0; i < asyncConfig.getNumWorkers(); i++) {
+ workers.add(
+ new AsyncWorker(connection, asyncConfig.getJdbcString(), i, tikaConfigPath));
+ }
+ return workers;
}
private void setupTables() throws SQLException {
- String sql = "create table task_queue " +
- "(id bigint auto_increment primary key," +
- "status tinyint," +//byte
- "worker_id integer," +
- "retry smallint," + //short
+ String sql = "create table task_queue " + "(id bigint auto_increment primary key," +
+ "status tinyint," + //byte
+ "worker_id integer," + "retry smallint," + //short
"time_stamp timestamp," +
"json varchar(64000))";//this is the AsyncTask ... not the emit data!
try (Statement st = connection.createStatement()) {
st.execute(sql);
}
- //no clear benefit to creating an index on timestamp
-// sql = "CREATE INDEX IF NOT EXISTS status_timestamp on status (time_stamp)";
sql = "create table workers (worker_id int primary key, status tinyint)";
try (Statement st = connection.createStatement()) {
st.execute(sql);
}
- sql = "create table error_log (task_id bigint, " +
- "fetch_key varchar(10000)," +
- "time_stamp timestamp," +
- "retry integer," +
- "error_code tinyint)";
+ sql = "create table emitters (emitter_id int primary key, status tinyint)";
+ try (Statement st = connection.createStatement()) {
+ st.execute(sql);
+ }
+
+ sql = "create table error_log (task_id bigint, " + "fetch_key varchar(10000)," +
+ "time_stamp timestamp," + "retry integer," + "error_code tinyint)";
+
try (Statement st = connection.createStatement()) {
st.execute(sql);
}
- sql = "create table emits (" +
- "id bigint primary key, " +
- "time_stamp timestamp, "+
- "uncompressed_size bigint, " +
- "bytes blob)";
+ sql = "create table emits (" + "id bigint primary key, " + "time_stamp timestamp, " +
+ "uncompressed_size bigint, " + "bytes blob)";
try (Statement st = connection.createStatement()) {
st.execute(sql);
}
}
public void shutdownNow() throws IOException, AsyncRuntimeException {
+ isShuttingDown = true;
try {
executorService.shutdownNow();
} finally {
@@ -189,50 +245,65 @@ public class AsyncProcessor implements Closeable {
}
/**
- * This is a blocking close. If you need to shutdown immediately,
- * try {@link #shutdownNow()}.
+ * This is a blocking close. It will wait for all tasks successfully submitted before this
+ * call to close() to complete before closing. If you need to shutdown immediately, try
+ * {@link #shutdownNow()}.
+ *
* @throws IOException
*/
@Override
public void close() throws IOException {
+ isShuttingDown = true;
try {
- for (int i = 0; i < asyncConfig.getMaxConsumers(); i++) {
- try {
- //blocking
- queue.put(FetchIterator.COMPLETED_SEMAPHORE);
- } catch (InterruptedException e) {
- //swallow
- }
- }
- //TODO: clean this up
- String sql = "update workers set status="+
- AsyncWorkerProcess.WORKER_STATUS_CODES.SHUTDOWN.ordinal()+
- " where worker_id = (" + asyncConfig.getMaxConsumers() + ")";
- try {
- connection.prepareStatement(sql).execute();
- } catch (SQLException throwables) {
- throwables.printStackTrace();
- }
- long start = System.currentTimeMillis();
- long elapsed = System.currentTimeMillis() - start;
- try {
- boolean isActive = checkActive();
- while (isActive) {
- isActive = checkActive();
- elapsed = System.currentTimeMillis();
- }
- } catch (InterruptedException e) {
- return;
- }
+ completeAndShutdown();
+ } catch (SQLException | InterruptedException e) {
+ throw new IOException(e);
} finally {
executorService.shutdownNow();
+ SQLException ex = null;
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ ex = e;
+ }
//close down processes and db
if (asyncConfig.getTempDBDir() != null) {
FileUtils.deleteDirectory(asyncConfig.getTempDBDir().toFile());
}
+ if (ex != null) {
+ throw new IOException(ex);
+ }
}
}
+ //this will block until everything finishes
+ private void completeAndShutdown() throws SQLException, InterruptedException {
+
+ //blocking...notify taskEnqueuer
+ queue.put(FetchIterator.COMPLETED_SEMAPHORE);
+
+ //wait for assignmentManager to finish
+ //it will only complete after the task enqueuer has completed
+ //and there are no more parse tasks available, selected or in process
+ while (!assignmentManager.hasCompletedTasks()) {
+ Thread.sleep(100);
+ }
+
+ emittersCanShutdown.executeUpdate();
+
+ //wait for emitters to finish
+ long start = System.currentTimeMillis();
+ long elapsed = System.currentTimeMillis() - start;
+ try {
+ boolean isActive = checkActive();
+ while (isActive) {
+ isActive = checkActive();
+ elapsed = System.currentTimeMillis();
+ }
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
//this reads fetchemittuples from the queue and inserts them in the db
//for the workers to read
@@ -245,8 +316,8 @@ public class AsyncProcessor implements Closeable {
private volatile boolean isComplete = false;
- AsyncTaskEnqueuer(ArrayBlockingQueue<FetchEmitTuple> queue,
- Connection connection) throws SQLException {
+ AsyncTaskEnqueuer(ArrayBlockingQueue<FetchEmitTuple> queue, Connection connection)
+ throws SQLException {
this.queue = queue;
this.connection = connection;
String sql = "insert into task_queue (status, time_stamp, worker_id, retry, json) " +
@@ -268,7 +339,7 @@ public class AsyncProcessor implements Closeable {
} else {
long start = System.currentTimeMillis();
long elapsed = System.currentTimeMillis() - start;
- //TODO -- fix this
+ //TODO -- fix this -- this loop waits for workers to register
while (workers.size() == 0 && elapsed < 600000) {
workers = getActiveWorkers(connection);
Thread.sleep(100);
@@ -283,7 +354,8 @@ public class AsyncProcessor implements Closeable {
return isComplete;
}
- private void insert(FetchEmitTuple t, List<Integer> workers) throws IOException, SQLException {
+ private void insert(FetchEmitTuple t, List<Integer> workers)
+ throws IOException, SQLException {
int workerId = workers.size() == 1 ? workers.get(0) :
workers.get(random.nextInt(workers.size()));
insert.clearParameters();
@@ -305,18 +377,24 @@ public class AsyncProcessor implements Closeable {
private final PreparedStatement reallocate;
private final PreparedStatement countAvailableTasks;
private final PreparedStatement shutdownWorker;
+ private final PreparedStatement findMaxRetrieds;
+ private final PreparedStatement logMaxRetrieds;
+ private final PreparedStatement removeMaxRetrieds;
private final Random random = new Random();
+ private final int maxRetries;
+ private volatile boolean hasCompleted = false;
- public AssignmentManager(Connection connection, AsyncTaskEnqueuer enqueuer) throws SQLException {
+ public AssignmentManager(Connection connection, AsyncTaskEnqueuer enqueuer, int maxRetries)
+ throws SQLException {
this.connection = connection;
this.enqueuer = enqueuer;
+ this.maxRetries = maxRetries;
//this gets workers and # of tasks in desc order of number of tasks
- String sql = "select w.worker_id, p.cnt " +
- "from workers w " +
+ String sql = "select w.worker_id, p.cnt " + "from workers w " +
"left join (select worker_id, count(1) as cnt from task_queue " +
- "where status=0 group by worker_id)" +
- " p on p.worker_id=w.worker_id order by p.cnt desc";
+ "where status=" + AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal() +
+ " group by worker_id)" + " p on p.worker_id=w.worker_id order by p.cnt desc";
getQueueDistribution = connection.prepareStatement(sql);
//find workers that have assigned tasks but are not in the
//workers table
@@ -331,26 +409,46 @@ public class AsyncProcessor implements Closeable {
//current strategy reallocate tasks from longest queue to shortest
//TODO: might consider randomly shuffling or other algorithms
sql = "update task_queue set worker_id= ? where id in " +
- "(select id from task_queue where " +
- "worker_id = ? and " +
- "rand() < 0.8 " +
+ "(select id from task_queue where " + "worker_id = ? and " + "rand() < 0.8 " +
"and status=0 for update)";
reallocate = connection.prepareStatement(sql);
- sql = "select count(1) from task_queue where status="
- + AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal();
+ //get those tasks that are in the parse phase
+ //if they are selected or in process, it is possible that
+ //they'll need to be retried. So, include all statuses
+ //meaning that the parse has not completed.
+ sql = "select count(1) from task_queue where status in (" +
+ AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal() + ", " +
+ AsyncWorkerProcess.TASK_STATUS_CODES.SELECTED.ordinal() + ", " +
+ AsyncWorkerProcess.TASK_STATUS_CODES.IN_PROCESS.ordinal() + ")";
countAvailableTasks = connection.prepareStatement(sql);
- sql = "update workers set status="+
+ sql = "update workers set status=" +
AsyncWorkerProcess.WORKER_STATUS_CODES.SHOULD_SHUTDOWN.ordinal() +
- " where worker_id = ?";
+ " where worker_id = ?";
shutdownWorker = connection.prepareStatement(sql);
+
+ sql = "select id, retry, json from task_queue where retry >=" + maxRetries;
+ findMaxRetrieds = connection.prepareStatement(sql);
+
+ sql = "insert into error_log (task_id, fetch_key, time_stamp, retry, error_code)" +
+ "values (?,?,CURRENT_TIMESTAMP(), ?," +
+ AsyncWorkerProcess.ERROR_CODES.MAX_RETRIES.ordinal() + ")";
+ logMaxRetrieds = connection.prepareStatement(sql);
+
+ sql = "delete from task_queue where id=?";
+ removeMaxRetrieds = connection.prepareStatement(sql);
+ }
+
+ protected boolean hasCompletedTasks() {
+ return hasCompleted;
}
@Override
public Integer call() throws Exception {
while (true) {
+ removeMaxRetrieds();
List<Integer> missingWorkers = getMissingWorkers();
reallocateFromMissingWorkers(missingWorkers);
redistribute();
@@ -362,6 +460,39 @@ public class AsyncProcessor implements Closeable {
}
}
+ private void removeMaxRetrieds() throws SQLException {
+ Set<Long> toRemove = new HashSet<>();
+ try (ResultSet rs = findMaxRetrieds.executeQuery()) {
+ while (rs.next()) {
+ long id = rs.getLong(1);
+ String json = rs.getString(2);
+ int retries = rs.getInt(3);
+ toRemove.add(id);
+ FetchEmitTuple t;
+ try (Reader reader = new StringReader(json)) {
+ t = JsonFetchEmitTuple.fromJson(reader);
+ } catch (Exception e) {
+ e.printStackTrace();
+ //need to log this in the error_logs table
+ continue;
+ }
+ logMaxRetrieds.clearParameters();
+ logMaxRetrieds.setLong(1, id);
+ logMaxRetrieds.setString(2, t.getFetchKey().getFetchKey());
+ logMaxRetrieds.setInt(3, retries);
+ logMaxRetrieds.addBatch();
+ }
+ }
+ logMaxRetrieds.executeBatch();
+
+ for (Long id : toRemove) {
+ removeMaxRetrieds.clearParameters();
+ removeMaxRetrieds.setLong(1, id);
+ removeMaxRetrieds.addBatch();
+ }
+ removeMaxRetrieds.executeBatch();
+ }
+
private void notifyWorkers() throws SQLException {
for (int workerId : getActiveWorkers(connection)) {
shutdownWorker.clearParameters();
@@ -371,12 +502,19 @@ public class AsyncProcessor implements Closeable {
}
private boolean isComplete() throws SQLException {
+ if (hasCompleted) {
+ return hasCompleted;
+ }
if (!enqueuer.isComplete) {
return false;
}
try (ResultSet rs = countAvailableTasks.executeQuery()) {
while (rs.next()) {
- return rs.getInt(1) == 0;
+ int availTasks = rs.getInt(1);
+ if (availTasks == 0) {
+ hasCompleted = true;
+ return true;
+ }
}
}
return false;
@@ -419,7 +557,8 @@ public class AsyncProcessor implements Closeable {
}
- private void reallocateFromMissingWorkers(List<Integer> missingWorkers) throws SQLException {
+ private void reallocateFromMissingWorkers(List<Integer> missingWorkers)
+ throws SQLException {
if (missingWorkers.size() == 0) {
return;
@@ -436,8 +575,7 @@ public class AsyncProcessor implements Closeable {
allocateNonworkersToWorkers.setInt(1, active);
allocateNonworkersToWorkers.setInt(2, missing);
allocateNonworkersToWorkers.execute();
- LOG.debug("allocating missing working ({}) to ({})",
- missing, active);
+ LOG.debug("allocating missing working ({}) to ({})", missing, active);
}
}
@@ -453,16 +591,4 @@ public class AsyncProcessor implements Closeable {
return missingWorkers;
}
}
-
- private static List<Integer> getActiveWorkers(Connection connection) throws SQLException {
- PreparedStatement findActiveWorkers = connection.prepareStatement(
- "select worker_id from workers");
- List<Integer> workers = new ArrayList<>();
- try (ResultSet rs = findActiveWorkers.executeQuery()) {
- while (rs.next()) {
- workers.add(rs.getInt(1));
- }
- }
- return workers;
- }
}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncTask.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
index 24acdf8..e0c214a 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
@@ -1,20 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.tika.pipes.async;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.tika.pipes.emitter.EmitKey;
+
import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
public class AsyncTask extends FetchEmitTuple {
- public static final AsyncTask SHUTDOWN_SEMAPHORE
- = new AsyncTask(-1, (short)-1, new FetchEmitTuple(null, null, null));
-
- private long taskId;
+ public static final AsyncTask SHUTDOWN_SEMAPHORE =
+ new AsyncTask(-1, (short) -1, new FetchEmitTuple(null, null, null));
private final short retry;
+ private long taskId;
public AsyncTask(@JsonProperty("taskId") long taskId, @JsonProperty("retry") short retry,
@JsonProperty("fetchEmitTuple") FetchEmitTuple fetchEmitTuple) {
- super(fetchEmitTuple.getFetchKey(), fetchEmitTuple.getEmitKey(), fetchEmitTuple.getMetadata());
+ super(fetchEmitTuple.getFetchKey(), fetchEmitTuple.getEmitKey(),
+ fetchEmitTuple.getMetadata());
this.taskId = taskId;
this.retry = retry;
}
@@ -23,18 +39,16 @@ public class AsyncTask extends FetchEmitTuple {
return taskId;
}
+ public void setTaskId(long taskId) {
+ this.taskId = taskId;
+ }
+
public short getRetry() {
return retry;
}
- public void setTaskId(long taskId) {
- this.taskId = taskId;
- }
@Override
public String toString() {
- return "AsyncTask{" +
- "taskId=" + taskId +
- ", retry=" + retry +
- '}';
+ return "AsyncTask{" + "taskId=" + taskId + ", retry=" + retry + '}';
}
}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
index 0b685a2..c26ee0f 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
@@ -1,10 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.tika.pipes.async;
-import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.utils.ProcessUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY;
+import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_JDBC_KEY;
import java.io.IOException;
import java.io.StringReader;
@@ -18,8 +31,11 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
-import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY;
-import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_JDBC_KEY;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
/**
* This controls monitoring of the AsyncWorkerProcess
@@ -39,26 +55,26 @@ public class AsyncWorker implements Callable<Integer> {
private final PreparedStatement selectActiveTasks;
private final PreparedStatement insertErrorLog;
private final PreparedStatement resetStatus;
+ //TODO: make this configurable
+ private final int maxRetries = 2;
- public AsyncWorker(Connection connection,
- String connectionString, int workerId,
+ public AsyncWorker(Connection connection, String connectionString, int workerId,
Path tikaConfigPath) throws SQLException {
this.connectionString = connectionString;
this.workerId = workerId;
this.tikaConfigPath = tikaConfigPath;
this.connection = connection;
- String sql = "update workers set status="+
- AsyncWorkerProcess.WORKER_STATUS_CODES.SHUTDOWN.ordinal()+
+ String sql = "update workers set status=" +
+ AsyncWorkerProcess.WORKER_STATUS_CODES.HAS_SHUTDOWN.ordinal() +
" where worker_id = (" + workerId + ")";
finished = connection.prepareStatement(sql);
- sql = "update workers set status="+
- AsyncWorkerProcess.WORKER_STATUS_CODES.RESTARTING.ordinal()+
+ sql = "update workers set status=" +
+ AsyncWorkerProcess.WORKER_STATUS_CODES.RESTARTING.ordinal() +
" where worker_id = (" + workerId + ")";
restarting = connection.prepareStatement(sql);
//this checks if the process was able to reset the status
- sql = "select id, retry, json from task_queue where worker_id="
- + workerId +
+ sql = "select id, retry, json from task_queue where worker_id=" + workerId +
" and status=" + AsyncWorkerProcess.TASK_STATUS_CODES.IN_PROCESS.ordinal();
selectActiveTasks = connection.prepareStatement(sql);
@@ -69,6 +85,45 @@ public class AsyncWorker implements Callable<Integer> {
resetStatus = prepareReset(connection);
}
+ static void reportAndReset(AsyncTask task, AsyncWorkerProcess.ERROR_CODES errorCode,
+ PreparedStatement insertErrorLog, PreparedStatement resetStatus,
+ Logger logger) {
+ try {
+ insertErrorLog.clearParameters();
+ insertErrorLog.setLong(1, task.getTaskId());
+ insertErrorLog.setString(2, task.getFetchKey().getFetchKey());
+ insertErrorLog.setInt(3, task.getRetry());
+ insertErrorLog.setByte(4, (byte) errorCode.ordinal());
+ insertErrorLog.execute();
+ } catch (SQLException e) {
+ logger.error("Can't update error log", e);
+ }
+
+ try {
+ resetStatus.clearParameters();
+ resetStatus.setByte(1, (byte) AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal());
+ resetStatus.setShort(2, (short) (task.getRetry() + 1));
+ resetStatus.setLong(3, task.getTaskId());
+ resetStatus.execute();
+ } catch (SQLException e) {
+ logger.error("Can't reset try status", e);
+ }
+ }
+
+ static PreparedStatement prepareInsertErrorLog(Connection connection) throws SQLException {
+ //if not, this is called to insert into the error log
+ return connection.prepareStatement(
+ "insert into error_log (task_id, fetch_key, time_stamp, retry, error_code) " +
+ " values (?,?,CURRENT_TIMESTAMP(),?,?)");
+ }
+
+ static PreparedStatement prepareReset(Connection connection) throws SQLException {
+ //and this is called to reset the status on error
+ return connection.prepareStatement(
+ "update task_queue set " + "status=?, " + "time_stamp=CURRENT_TIMESTAMP(), " +
+ "retry=? " + "where id=?");
+ }
+
@Override
public Integer call() throws Exception {
Process p = null;
@@ -80,7 +135,7 @@ public class AsyncWorker implements Callable<Integer> {
if (finished) {
int exitValue = p.exitValue();
if (exitValue == 0) {
- LOG.info("forked worker process finished with exitValue=0");
+ LOG.debug("forked worker process finished with exitValue=0");
return 1;
}
reportCrash(++restarts, exitValue);
@@ -95,24 +150,24 @@ public class AsyncWorker implements Callable<Integer> {
}
}
+ public int getMaxRetries() {
+ return maxRetries;
+ }
+
private Process start() throws IOException {
- String[] args = new String[]{
- "java", "-Djava.awt.headless=true",
- "-cp", System.getProperty("java.class.path"),
- "org.apache.tika.pipes.async.AsyncWorkerProcess",
- Integer.toString(workerId)
- };
+ String[] args = new String[]{"java", "-Djava.awt.headless=true", "-cp",
+ System.getProperty("java.class.path"),
+ "org.apache.tika.pipes.async.AsyncWorkerProcess", Integer.toString(workerId)};
ProcessBuilder pb = new ProcessBuilder(args);
pb.environment().put(TIKA_ASYNC_JDBC_KEY, connectionString);
- pb.environment().put(TIKA_ASYNC_CONFIG_FILE_KEY,
- tikaConfigPath.toAbsolutePath().toString());
+ pb.environment()
+ .put(TIKA_ASYNC_CONFIG_FILE_KEY, tikaConfigPath.toAbsolutePath().toString());
pb.inheritIO();
return pb.start();
}
private void reportCrash(int numRestarts, int exitValue) throws SQLException, IOException {
- LOG.warn("worker id={} terminated, exitValue={}",
- workerId, exitValue);
+ LOG.warn("worker id={} terminated, exitValue={}", workerId, exitValue);
restarting.execute();
List<AsyncTask> activeTasks = new ArrayList<>();
try (ResultSet rs = selectActiveTasks.executeQuery()) {
@@ -123,60 +178,18 @@ public class AsyncWorker implements Callable<Integer> {
activeTasks.add(new AsyncTask(taskId, retry, tuple));
}
if (activeTasks.size() == 0) {
- LOG.info("worker reset active tasks, nothing extra to report");
+ LOG.debug("worker reset active tasks, nothing extra to report");
return;
}
+
if (activeTasks.size() > 1) {
LOG.warn("more than one active task? this should never happen!");
}
for (AsyncTask t : activeTasks) {
- reportAndReset(t, AsyncWorkerProcess.ERROR_CODES.UNKNOWN_PARSE,
- insertErrorLog, resetStatus, LOG);
- }
-
- }
-
- static void reportAndReset(AsyncTask task, AsyncWorkerProcess.ERROR_CODES errorCode,
- PreparedStatement insertErrorLog, PreparedStatement resetStatus,
- Logger logger) {
- try {
- insertErrorLog.clearParameters();
- insertErrorLog.setLong(1, task.getTaskId());
- insertErrorLog.setString(2, task.getFetchKey().getKey());
- insertErrorLog.setInt(3, task.getRetry());
- insertErrorLog.setByte(4, (byte) errorCode.ordinal());
- insertErrorLog.execute();
- } catch (SQLException e) {
- logger.error("Can't update error log", e);
- }
-
- try {
- resetStatus.clearParameters();
- resetStatus.setByte(1, (byte) AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal());
- resetStatus.setShort(2, (short)(task.getRetry()+1));
- resetStatus.setLong(3, task.getTaskId());
- resetStatus.execute();
- } catch (SQLException e) {
- logger.error("Can't reset try status", e);
+ reportAndReset(t, AsyncWorkerProcess.ERROR_CODES.UNKNOWN_PARSE, insertErrorLog,
+ resetStatus, LOG);
}
- }
-
- static PreparedStatement prepareInsertErrorLog(Connection connection) throws SQLException {
- //if not, this is called to insert into the error log
- return connection.prepareStatement(
- "insert into error_log (task_id, fetch_key, time_stamp, retry, error_code) " +
- " values (?,?,CURRENT_TIMESTAMP(),?,?)"
- );
- }
- static PreparedStatement prepareReset(Connection connection) throws SQLException {
- //and this is called to reset the status on error
- return connection.prepareStatement(
- "update task_queue set " +
- "status=?, " +
- "time_stamp=CURRENT_TIMESTAMP(), " +
- "retry=? " +
- "where id=?");
}
}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
index 8675f22..988748f 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
@@ -1,27 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.tika.pipes.async;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import net.jpountz.lz4.LZ4Factory;
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.exception.EncryptedDocumentException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.metadata.TikaCoreProperties;
-import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.metadata.serialization.JsonMetadata;
-import org.apache.tika.parser.ParseContext;
-import org.apache.tika.parser.Parser;
-import org.apache.tika.parser.RecursiveParserWrapper;
-import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.sax.BasicContentHandlerFactory;
-import org.apache.tika.sax.RecursiveParserWrapperHandler;
-import org.apache.tika.utils.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.xml.sax.SAXException;
+import static org.apache.tika.pipes.async.AsyncTask.SHUTDOWN_SEMAPHORE;
+import static org.apache.tika.pipes.async.AsyncWorker.prepareInsertErrorLog;
+import static org.apache.tika.pipes.async.AsyncWorker.prepareReset;
+import static org.apache.tika.pipes.async.AsyncWorker.reportAndReset;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -44,52 +42,37 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import static org.apache.tika.pipes.async.AsyncTask.SHUTDOWN_SEMAPHORE;
-import static org.apache.tika.pipes.async.AsyncWorker.prepareInsertErrorLog;
-import static org.apache.tika.pipes.async.AsyncWorker.prepareReset;
-import static org.apache.tika.pipes.async.AsyncWorker.reportAndReset;
-
-public class AsyncWorkerProcess {
-
- enum TASK_STATUS_CODES {
- AVAILABLE,
- SELECTED,
- IN_PROCESS,
- AVAILABLE_EMIT,
- SELECTED_EMIT,
- IN_PROCESS_EMIT,
- FAILED_EMIT,
- EMITTED
- }
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import net.jpountz.lz4.LZ4Factory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
- public enum WORKER_STATUS_CODES {
- ACTIVE,
- RESTARTING,
- HIBERNATING,
- SHOULD_SHUTDOWN,
- SHUTDOWN
- }
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.exception.EncryptedDocumentException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.metadata.serialization.JsonMetadataSerializer;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.parser.Parser;
+import org.apache.tika.parser.RecursiveParserWrapper;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.sax.BasicContentHandlerFactory;
+import org.apache.tika.sax.RecursiveParserWrapperHandler;
+import org.apache.tika.utils.StringUtils;
- enum ERROR_CODES {
- TIMEOUT,
- SECURITY_EXCEPTION,
- OTHER_EXCEPTION,
- OOM,
- OTHER_ERROR,
- UNKNOWN_PARSE,
- EMIT_SERIALIZATION,
- EMIT_SQL_INSERT_EXCEPTION,
- EMIT_SQL_SELECT_EXCEPTION,
- EMIT_DESERIALIZATION,
- EMIT_EXCEPTION
- }
+public class AsyncWorkerProcess {
private static final Logger LOG = LoggerFactory.getLogger(AsyncWorkerProcess.class);
-
//make these all configurable
private static final long SHUTDOWN_AFTER_MS = 120000;
- private static long PULSE_MS = 1000;
- private long parseTimeoutMs = 60000;
+ private static final long PULSE_MS = 1000;
+ private final long parseTimeoutMs = 60000;
public static void main(String[] args) throws Exception {
Path tikaConfigPath = Paths.get(System.getenv(AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY));
@@ -104,15 +87,15 @@ public class AsyncWorkerProcess {
System.exit(0);
}
- private void execute(Connection connection,
- int workerId, TikaConfig tikaConfig) throws SQLException {
-
+ private void execute(Connection connection, int workerId, TikaConfig tikaConfig)
+ throws SQLException {
+ //3 = worker + forkwatcher + active task
ExecutorService service = Executors.newFixedThreadPool(3);
ExecutorCompletionService<Integer> executorCompletionService =
new ExecutorCompletionService<>(service);
- executorCompletionService.submit(new Worker(connection, workerId,
- tikaConfig, parseTimeoutMs));
+ executorCompletionService
+ .submit(new Worker(connection, workerId, tikaConfig, parseTimeoutMs));
executorCompletionService.submit(new ForkWatcher(System.in));
int completed = 0;
@@ -135,6 +118,23 @@ public class AsyncWorkerProcess {
return;
}
+ enum TASK_STATUS_CODES {
+ AVAILABLE, SELECTED, IN_PROCESS, AVAILABLE_EMIT, SELECTED_EMIT, IN_PROCESS_EMIT,
+ FAILED_EMIT, EMITTED
+ }
+
+ public enum WORKER_STATUS_CODES {
+ ACTIVE, RESTARTING, HIBERNATING, CAN_SHUTDOWN,//if there's nothing else to process, shutdown
+ SHOULD_SHUTDOWN, //shutdown now whether or not there's anything else to process
+ HAS_SHUTDOWN
+ }
+
+ enum ERROR_CODES {
+ TIMEOUT, SECURITY_EXCEPTION, OTHER_EXCEPTION, OOM, OTHER_ERROR, UNKNOWN_PARSE, MAX_RETRIES,
+ EMIT_SERIALIZATION, EMIT_SQL_INSERT_EXCEPTION, EMIT_SQL_SELECT_EXCEPTION,
+ EMIT_DESERIALIZATION, EMIT_EXCEPTION
+ }
+
private static class TaskQueue {
private final Connection connection;
private final int workerId;
@@ -149,28 +149,22 @@ public class AsyncWorkerProcess {
this.connection = connection;
this.workerId = workerId;
//TODO -- need to update timestamp
- String sql = "update task_queue set status=" +
- TASK_STATUS_CODES.SELECTED.ordinal()+
- ", time_stamp=CURRENT_TIMESTAMP()"+
- " where id = " +
- " (select id from task_queue where worker_id = " + workerId +
- " and status="+ TASK_STATUS_CODES.AVAILABLE.ordinal()+
+ String sql = "update task_queue set status=" + TASK_STATUS_CODES.SELECTED.ordinal() +
+ ", time_stamp=CURRENT_TIMESTAMP()" + " where id = " +
+ " (select id from task_queue where worker_id = " + workerId + " and status=" +
+ TASK_STATUS_CODES.AVAILABLE.ordinal() +
" order by time_stamp asc limit 1 for update)";
markForSelecting = connection.prepareStatement(sql);
sql = "select id, retry, json from task_queue where status=" +
- TASK_STATUS_CODES.SELECTED.ordinal() +
- " and " +
- " worker_id=" + workerId +
+ TASK_STATUS_CODES.SELECTED.ordinal() + " and " + " worker_id=" + workerId +
" order by time_stamp asc limit 1";
selectForProcessing = connection.prepareStatement(sql);
- sql = "update task_queue set status="+
- TASK_STATUS_CODES.IN_PROCESS.ordinal()+
- ", time_stamp=CURRENT_TIMESTAMP()"+
- " where id=?";
+ sql = "update task_queue set status=" + TASK_STATUS_CODES.IN_PROCESS.ordinal() +
+ ", time_stamp=CURRENT_TIMESTAMP()" + " where id=?";
markForProcessing = connection.prepareStatement(sql);
- sql = "select count(1) from workers where worker_id=" + workerId +
- " and status="+WORKER_STATUS_CODES.SHOULD_SHUTDOWN.ordinal();
+ sql = "select count(1) from workers where worker_id=" + workerId + " and status=" +
+ WORKER_STATUS_CODES.SHOULD_SHUTDOWN.ordinal();
checkForShutdown = connection.prepareStatement(sql);
}
@@ -183,7 +177,7 @@ public class AsyncWorkerProcess {
}
int i = markForSelecting.executeUpdate();
if (i == 0) {
- debugQueue();
+// debugQueue();
Thread.sleep(PULSE_MS);
} else {
long taskId = -1;
@@ -213,13 +207,12 @@ public class AsyncWorkerProcess {
}
private void debugQueue() throws SQLException {
- try (ResultSet rs = connection.createStatement().executeQuery(
- "select * from task_queue limit 10")) {
+ try (ResultSet rs = connection.createStatement()
+ .executeQuery("select id, status, worker_id from task_queue limit 10")) {
while (rs.next()) {
- for (int i = 1; i <= rs.getMetaData().getColumnCount(); i++) {
- System.out.print(rs.getString(i)+ " ");
- }
- System.out.println("");
+ System.out.println(
+ "id: " + rs.getInt(1) + " status: " + rs.getInt(2) + " worker_id: " +
+ rs.getInt(3));
}
}
}
@@ -243,18 +236,17 @@ public class AsyncWorkerProcess {
private final RecursiveParserWrapper parser;
private final TikaConfig tikaConfig;
private final long parseTimeoutMs;
- private ExecutorService executorService;
- private ExecutorCompletionService<AsyncData> executorCompletionService;
private final PreparedStatement insertErrorLog;
private final PreparedStatement resetStatus;
private final PreparedStatement insertEmitData;
private final PreparedStatement updateStatusForEmit;
private final ObjectMapper objectMapper = new ObjectMapper();
LZ4Factory factory = LZ4Factory.fastestInstance();
+ private final ExecutorService executorService;
+ private final ExecutorCompletionService<AsyncData> executorCompletionService;
- public Worker(Connection connection,
- int workerId,
- TikaConfig tikaConfig, long parseTimeoutMs) throws SQLException {
+ public Worker(Connection connection, int workerId, TikaConfig tikaConfig,
+ long parseTimeoutMs) throws SQLException {
this.connection = connection;
this.workerId = workerId;
this.parser = new RecursiveParserWrapper(tikaConfig.getParser());
@@ -264,22 +256,24 @@ public class AsyncWorkerProcess {
this.parseTimeoutMs = parseTimeoutMs;
SimpleModule module = new SimpleModule();
- module.addSerializer(Metadata.class, new JsonMetadata());
+ module.addSerializer(Metadata.class, new JsonMetadataSerializer());
objectMapper.registerModule(module);
- String sql = "insert into workers (worker_id, status) " +
- "values (" + workerId + ", "+
- WORKER_STATUS_CODES.ACTIVE.ordinal()+")";
+ String sql = "merge into workers key (worker_id) " + "values (" + workerId + ", " +
+ WORKER_STATUS_CODES.ACTIVE.ordinal() + ")";
connection.createStatement().execute(sql);
insertErrorLog = prepareInsertErrorLog(connection);
resetStatus = prepareReset(connection);
insertEmitData = prepareInsertEmitData(connection);
- sql = "update task_queue set status="+
- TASK_STATUS_CODES.AVAILABLE_EMIT.ordinal()+
- ", time_stamp=CURRENT_TIMESTAMP()" +
- " where id=?";
+ sql = "update task_queue set status=" + TASK_STATUS_CODES.AVAILABLE_EMIT.ordinal() +
+ ", time_stamp=CURRENT_TIMESTAMP()" + " where id=?";
updateStatusForEmit = connection.prepareStatement(sql);
}
+ static PreparedStatement prepareInsertEmitData(Connection connection) throws SQLException {
+ return connection.prepareStatement(
+ "insert into emits (id, time_stamp, uncompressed_size, bytes) " +
+ " values (?,CURRENT_TIMESTAMP(),?,?)");
+ }
public Integer call() throws Exception {
AsyncTask task = null;
@@ -306,26 +300,22 @@ public class AsyncWorkerProcess {
}
}
} catch (TimeoutException e) {
- LOG.warn(task.getFetchKey().getKey(), e);
- reportAndReset(task, ERROR_CODES.TIMEOUT,
- insertErrorLog, resetStatus, LOG);
+ LOG.warn(task.getFetchKey().getFetchKey(), e);
+ reportAndReset(task, ERROR_CODES.TIMEOUT, insertErrorLog, resetStatus, LOG);
} catch (SecurityException e) {
- LOG.warn(task.getFetchKey().getKey(), e);
- reportAndReset(task, ERROR_CODES.SECURITY_EXCEPTION,
- insertErrorLog, resetStatus, LOG);
+ LOG.warn(task.getFetchKey().getFetchKey(), e);
+ reportAndReset(task, ERROR_CODES.SECURITY_EXCEPTION, insertErrorLog, resetStatus,
+ LOG);
} catch (Exception e) {
e.printStackTrace();
- LOG.warn(task.getFetchKey().getKey(), e);
- reportAndReset(task, ERROR_CODES.OTHER_EXCEPTION,
- insertErrorLog, resetStatus, LOG);
+ LOG.warn(task.getFetchKey().getFetchKey(), e);
+ reportAndReset(task, ERROR_CODES.OTHER_EXCEPTION, insertErrorLog, resetStatus, LOG);
} catch (OutOfMemoryError e) {
- LOG.warn(task.getFetchKey().getKey(), e);
- reportAndReset(task, ERROR_CODES.OOM,
- insertErrorLog, resetStatus, LOG);
+ LOG.warn(task.getFetchKey().getFetchKey(), e);
+ reportAndReset(task, ERROR_CODES.OOM, insertErrorLog, resetStatus, LOG);
} catch (Error e) {
- LOG.warn(task.getFetchKey().getKey(), e);
- reportAndReset(task, ERROR_CODES.OTHER_ERROR,
- insertErrorLog, resetStatus, LOG);
+ LOG.warn(task.getFetchKey().getFetchKey(), e);
+ reportAndReset(task, ERROR_CODES.OTHER_ERROR, insertErrorLog, resetStatus, LOG);
} finally {
executorService.shutdownNow();
return 1;
@@ -338,15 +328,16 @@ public class AsyncWorkerProcess {
LOG.debug("received shutdown notification");
return;
} else {
- executorCompletionService.submit(new TaskProcessor(task, tikaConfig, parser,
- workerId));
- Future<AsyncData> future = executorCompletionService.poll(parseTimeoutMs, TimeUnit.MILLISECONDS);
+ executorCompletionService
+ .submit(new TaskProcessor(task, tikaConfig, parser, workerId));
+ Future<AsyncData> future =
+ executorCompletionService.poll(parseTimeoutMs, TimeUnit.MILLISECONDS);
if (future == null) {
- handleTimeout(task.getTaskId(), task.getFetchKey().getKey());
+ handleTimeout(task.getTaskId(), task.getFetchKey().getFetchKey());
} else {
AsyncData asyncData = future.get(1000, TimeUnit.MILLISECONDS);
if (asyncData == null) {
- handleTimeout(task.getTaskId(), task.getFetchKey().getKey());
+ handleTimeout(task.getTaskId(), task.getFetchKey().getFetchKey());
}
boolean shouldEmit = checkForParseException(asyncData);
if (shouldEmit) {
@@ -354,13 +345,11 @@ public class AsyncWorkerProcess {
emit(asyncData);
} catch (JsonProcessingException e) {
e.printStackTrace();
- recordBadEmit(task.getTaskId(),
- task.getFetchKey().getKey(),
+ recordBadEmit(task.getTaskId(), task.getFetchKey().getFetchKey(),
ERROR_CODES.EMIT_SERIALIZATION.ordinal());
} catch (SQLException e) {
e.printStackTrace();
- recordBadEmit(task.getTaskId(),
- task.getFetchKey().getKey(),
+ recordBadEmit(task.getTaskId(), task.getFetchKey().getFetchKey(),
ERROR_CODES.EMIT_SQL_INSERT_EXCEPTION.ordinal());
}
}
@@ -372,18 +361,16 @@ public class AsyncWorkerProcess {
//stub
}
- private void emit(AsyncData asyncData) throws SQLException,
- JsonProcessingException {
+ private void emit(AsyncData asyncData) throws SQLException, JsonProcessingException {
insertEmitData.clearParameters();
- insertEmitData.setLong(1, asyncData.getAsyncTask().getTaskId());
+ insertEmitData.setLong(1, asyncData.getTaskId());
byte[] bytes = objectMapper.writeValueAsBytes(asyncData);
byte[] compressed = factory.fastCompressor().compress(bytes);
insertEmitData.setLong(2, bytes.length);
insertEmitData.setBlob(3, new ByteArrayInputStream(compressed));
insertEmitData.execute();
updateStatusForEmit.clearParameters();
- updateStatusForEmit.setLong(1,
- asyncData.getAsyncTask().getTaskId());
+ updateStatusForEmit.setLong(1, asyncData.getTaskId());
updateStatusForEmit.execute();
}
@@ -392,12 +379,10 @@ public class AsyncWorkerProcess {
throw new TimeoutException(key);
}
-
private boolean checkForParseException(AsyncData asyncData) {
if (asyncData == null || asyncData.getMetadataList() == null ||
asyncData.getMetadataList().size() == 0) {
- LOG.warn("empty or null emit data ({})", asyncData.getAsyncTask()
- .getFetchKey().getKey());
+ LOG.warn("empty or null emit data ({})", asyncData.getFetchKey().getFetchKey());
return false;
}
boolean shouldEmit = true;
@@ -405,9 +390,8 @@ public class AsyncWorkerProcess {
String stack = container.get(TikaCoreProperties.CONTAINER_EXCEPTION);
if (stack != null) {
LOG.warn("fetchKey ({}) container parse exception ({})",
- asyncData.getAsyncTask().getFetchKey().getKey(), stack);
- if (asyncData.getAsyncTask().getOnParseException()
- == FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP) {
+ asyncData.getFetchKey().getFetchKey(), stack);
+ if (asyncData.getOnParseException() == FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP) {
shouldEmit = false;
}
}
@@ -417,18 +401,11 @@ public class AsyncWorkerProcess {
String embeddedStack = m.get(TikaCoreProperties.EMBEDDED_EXCEPTION);
if (embeddedStack != null) {
LOG.warn("fetchKey ({}) embedded parse exception ({})",
- asyncData.getAsyncTask().getFetchKey().getKey(), embeddedStack);
+ asyncData.getFetchKey().getFetchKey(), embeddedStack);
}
}
return shouldEmit;
}
-
- static PreparedStatement prepareInsertEmitData(Connection connection) throws SQLException {
- return connection.prepareStatement(
- "insert into emits (id, time_stamp, uncompressed_size, bytes) " +
- " values (?,CURRENT_TIMESTAMP(),?,?)"
- );
- }
}
private static class TaskProcessor implements Callable<AsyncData> {
@@ -438,9 +415,7 @@ public class AsyncWorkerProcess {
private final TikaConfig tikaConfig;
private final int workerId;
- public TaskProcessor(AsyncTask task,
- TikaConfig tikaConfig,
- Parser parser, int workerId) {
+ public TaskProcessor(AsyncTask task, TikaConfig tikaConfig, Parser parser, int workerId) {
this.task = task;
this.parser = parser;
this.tikaConfig = tikaConfig;
@@ -451,14 +426,11 @@ public class AsyncWorkerProcess {
Metadata userMetadata = task.getMetadata();
Metadata metadata = new Metadata();
String fetcherName = task.getFetchKey().getFetcherName();
- String fetchKey = task.getFetchKey().getKey();
+ String fetchKey = task.getFetchKey().getFetchKey();
List<Metadata> metadataList = null;
- try (InputStream stream = tikaConfig.getFetcherManager()
- .getFetcher(fetcherName)
+ try (InputStream stream = tikaConfig.getFetcherManager().getFetcher(fetcherName)
.fetch(fetchKey, metadata)) {
- metadataList = parseMetadata(task.getFetchKey(),
- stream,
- metadata);
+ metadataList = parseMetadata(task.getFetchKey(), stream, metadata);
} catch (SecurityException e) {
throw e;
}
@@ -468,11 +440,12 @@ public class AsyncWorkerProcess {
emitKey = new EmitKey(emitKey.getEmitterName(), fetchKey);
task.setEmitKey(emitKey);
}
- return new AsyncData(task, metadataList);
+ return new AsyncData(task.getTaskId(), task.getFetchKey(), task.getEmitKey(),
+ task.getOnParseException(), metadataList);
}
- private List<Metadata> parseMetadata(FetchKey fetchKey,
- InputStream stream, Metadata metadata) {
+ private List<Metadata> parseMetadata(FetchKey fetchKey, InputStream stream,
+ Metadata metadata) {
//make these configurable
BasicContentHandlerFactory.HANDLER_TYPE type =
BasicContentHandlerFactory.HANDLER_TYPE.TEXT;
@@ -486,16 +459,16 @@ public class AsyncWorkerProcess {
try {
parser.parse(stream, handler, metadata, parseContext);
} catch (SAXException e) {
- LOG.warn("problem:" + fetchKey.getKey(), e);
+ LOG.warn("problem:" + fetchKey.getFetchKey(), e);
} catch (EncryptedDocumentException e) {
- LOG.warn("encrypted:" + fetchKey.getKey(), e);
+ LOG.warn("encrypted:" + fetchKey.getFetchKey(), e);
} catch (SecurityException e) {
- LOG.warn("security exception: " + fetchKey.getKey());
+ LOG.warn("security exception: " + fetchKey.getFetchKey());
throw e;
} catch (Exception e) {
- LOG.warn("exception: " + fetchKey.getKey());
+ LOG.warn("exception: " + fetchKey.getFetchKey());
} catch (OutOfMemoryError e) {
- LOG.error("oom: " + fetchKey.getKey());
+ LOG.error("oom: " + fetchKey.getFetchKey());
throw e;
}
return handler.getMetadataList();
@@ -514,6 +487,7 @@ public class AsyncWorkerProcess {
private static class ForkWatcher implements Callable<Integer> {
private final InputStream in;
+
public ForkWatcher(InputStream in) {
this.in = in;
}
diff --git a/tika-pipes/tika-pipes-async/src/main/resources/log4j.properties b/tika-pipes/tika-pipes-async/src/main/resources/log4j.properties
index 585b03b..43553a4 100644
--- a/tika-pipes/tika-pipes-async/src/main/resources/log4j.properties
+++ b/tika-pipes/tika-pipes-async/src/main/resources/log4j.properties
@@ -1,3 +1,4 @@
+#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
@@ -12,13 +13,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
#info,debug, error,fatal ...
log4j.rootLogger=debug,stderr
-
#console
log4j.appender.stderr=org.apache.log4j.ConsoleAppender
log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
log4j.appender.stderr.Target=System.err
-
log4j.appender.stderr.layout.ConversionPattern=%d{ABSOLUTE} %-5p %m%n
diff --git a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncCliTest.java b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncCliTest.java
deleted file mode 100644
index c653267..0000000
--- a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncCliTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package org.apache.tika.pipes.async;
-
-import org.junit.Test;
-
-public class AsyncCliTest {
- @Test
- public void testBasic() throws Exception {
- String[] args = {
- "/Users/allison/Desktop/tika-tmp/tika-config.xml"
- };
- AsyncCli.main(args);
- }
-
- @Test
- public void testUnhandled() throws InterruptedException {
- Thread t = new Thread(new Task());
-
- t.start();
- t.join();
- for (StackTraceElement el : t.getStackTrace()) {
- System.out.println(el);
- }
- }
-
- private static class Task implements Runnable {
-
- @Override
- public void run() {
- Thread.currentThread().setUncaughtExceptionHandler(new MyUncaught());
- for (int i = 0; i < 5; i++) {
- System.out.println(i);
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- throw new RuntimeException();
- }
- }
- throw new RuntimeException("kaboom");
- }
- }
-
- private static class MyUncaught implements Thread.UncaughtExceptionHandler {
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- throw new RuntimeException("bad");
- }
- }
-}
diff --git a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
index 920634e..04890b3 100644
--- a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
+++ b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
@@ -1,13 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.tika.pipes.async;
-import org.apache.commons.io.FileUtils;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -18,6 +28,18 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
public class AsyncProcessorTest {
@@ -30,29 +52,21 @@ public class AsyncProcessorTest {
public void setUp() throws SQLException, IOException {
dbDir = Files.createTempDirectory("async-db");
dbFile = dbDir.resolve("emitted-db");
- String jdbc = "jdbc:h2:file:"+dbFile.toAbsolutePath().toString()+";AUTO_SERVER=TRUE";
- String sql = "create table emitted (id int auto_increment primary key, json varchar(20000))";
+ String jdbc = "jdbc:h2:file:" + dbFile.toAbsolutePath().toString() + ";AUTO_SERVER=TRUE";
+ String sql = "create table emitted (id int auto_increment primary key, " +
+ "emitkey varchar(2000), json varchar(20000))";
connection = DriverManager.getConnection(jdbc);
connection.createStatement().execute(sql);
tikaConfigPath = dbDir.resolve("tika-config.xml");
- String xml = "" +
- "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" +
- "<properties>" +
- " <emitters>"+
- " <emitter class=\"org.apache.tika.pipes.async.MockEmitter\">\n" +
- " <params>\n" +
- " <param name=\"name\" type=\"string\">mock</param>\n"+
- " <param name=\"jdbc\" type=\"string\">"+jdbc+"</param>\n"+
- " </params>" +
- " </emitter>" +
- " </emitters>"+
- " <fetchers>" +
+ String xml = "" + "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" + "<properties>" +
+ " <emitters>" + " <emitter class=\"org.apache.tika.pipes.async.MockEmitter\">\n" +
+ " <params>\n" + " <param name=\"name\" type=\"string\">mock</param>\n" +
+ " <param name=\"jdbc\" type=\"string\">" + jdbc + "</param>\n" +
+ " </params>" + " </emitter>" + " </emitters>" + " <fetchers>" +
" <fetcher class=\"org.apache.tika.pipes.async.MockFetcher\">" +
- " <param name=\"name\" type=\"string\">mock</param>\n"+
- " </fetcher>" +
- " </fetchers>"+
- "</properties>";
+ " <param name=\"name\" type=\"string\">mock</param>\n" + " </fetcher>" +
+ " </fetchers>" + "</properties>";
Files.write(tikaConfigPath, xml.getBytes(StandardCharsets.UTF_8));
}
@@ -68,21 +82,24 @@ public class AsyncProcessorTest {
AsyncProcessor processor = AsyncProcessor.build(tikaConfigPath);
- for (int i = 0 ; i < 100; i++) {
- FetchEmitTuple t = new FetchEmitTuple(
- new FetchKey("mock", "key-"+i),
- new EmitKey("mock", "emit-"+i),
- new Metadata()
- );
+ int max = 100;
+ for (int i = 0; i < max; i++) {
+ FetchEmitTuple t = new FetchEmitTuple(new FetchKey("mock", "key-" + i),
+ new EmitKey("mock", "emit-" + i), new Metadata());
processor.offer(t, 1000);
}
processor.close();
- String sql = "select * from emitted";
- try (Statement st = connection.createStatement();
- ResultSet rs = st.executeQuery(sql)) {
+ String sql = "select emitkey from emitted";
+ Set<String> emitKeys = new HashSet<>();
+ try (Statement st = connection.createStatement(); ResultSet rs = st.executeQuery(sql)) {
while (rs.next()) {
- System.out.println(rs.getInt(1) + " : "+rs.getString(2));
+ String emitKey = rs.getString(1);
+ emitKeys.add(emitKey);
}
}
+ assertEquals(max, emitKeys.size());
+ for (int i = 0; i < max; i++) {
+ assertTrue(emitKeys.contains("emit-" + i));
+ }
}
}
diff --git a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockEmitter.java b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
index 9f07dec..5576f73 100644
--- a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
+++ b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
@@ -1,32 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.tika.pipes.async;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+
import org.apache.tika.config.Field;
import org.apache.tika.config.Initializable;
import org.apache.tika.config.InitializableProblemHandler;
import org.apache.tika.config.Param;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.serialization.JsonMetadataSerializer;
import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.emitter.Emitter;
import org.apache.tika.pipes.emitter.TikaEmitterException;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
public class MockEmitter implements Initializable, Emitter {
private final ObjectMapper objectMapper = new ObjectMapper();
private Connection connection;
private String jdbc;
private PreparedStatement insert;
+ public MockEmitter() {
+ SimpleModule module = new SimpleModule();
+ module.addSerializer(Metadata.class, new JsonMetadataSerializer());
+ objectMapper.registerModule(module);
+ }
+
@Field
public void setJdbc(String jdbc) {
this.jdbc = jdbc;
@@ -38,10 +63,10 @@ public class MockEmitter implements Initializable, Emitter {
}
@Override
- public void emit(String emitKey, List<Metadata> metadataList) throws IOException, TikaEmitterException {
- emit(Collections.singletonList(
- new EmitData(
- new EmitKey(getName(), emitKey), metadataList)));
+ public void emit(String emitKey, List<Metadata> metadataList)
+ throws IOException, TikaEmitterException {
+ emit(Collections
+ .singletonList(new EmitData(new EmitKey(getName(), emitKey), metadataList)));
}
@Override
@@ -50,7 +75,8 @@ public class MockEmitter implements Initializable, Emitter {
String json = objectMapper.writeValueAsString(d);
try {
insert.clearParameters();
- insert.setString(1, json);
+ insert.setString(1, d.getEmitKey().getEmitKey());
+ insert.setString(2, json);
insert.execute();
} catch (SQLException e) {
throw new TikaEmitterException("problem inserting", e);
@@ -62,7 +88,7 @@ public class MockEmitter implements Initializable, Emitter {
public void initialize(Map<String, Param> params) throws TikaConfigException {
try {
connection = DriverManager.getConnection(jdbc);
- String sql = "insert into emitted (json) values (?)";
+ String sql = "insert into emitted (emitkey, json) values (?, ?)";
insert = connection.prepareStatement(sql);
} catch (SQLException e) {
throw new TikaConfigException("problem w connection", e);
@@ -70,7 +96,8 @@ public class MockEmitter implements Initializable, Emitter {
}
@Override
- public void checkInitialization(InitializableProblemHandler problemHandler) throws TikaConfigException {
+ public void checkInitialization(InitializableProblemHandler problemHandler)
+ throws TikaConfigException {
}
}
diff --git a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockFetcher.java b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
index 8c99ecf..7a74f2a 100644
--- a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
+++ b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
@@ -1,21 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.tika.pipes.async;
-import org.apache.tika.exception.TikaException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.fetcher.Fetcher;
-
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.fetcher.Fetcher;
+
public class MockFetcher implements Fetcher {
- private static byte[] BYTES = new String("<?xml version=\"1.0\" encoding=\"UTF-8\" ?>"+
- "<mock>"+
- "<metadata action=\"add\" name=\"dc:creator\">Nikolai Lobachevsky</metadata>"+
- "<write element=\"p\">main_content</write>"+
- "</mock>").getBytes(StandardCharsets.UTF_8);
+ private static final byte[] BYTES = ("<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" + "<mock>" +
+ "<metadata action=\"add\" name=\"dc:creator\">Nikolai Lobachevsky</metadata>" +
+ "<write element=\"p\">main_content</write>" + "</mock>")
+ .getBytes(StandardCharsets.UTF_8);
@Override
public String getName() {
diff --git a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/SerializationTest.java b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/SerializationTest.java
new file mode 100644
index 0000000..ebbd8f0
--- /dev/null
+++ b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/SerializationTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.async;
+
+import static org.junit.Assert.assertEquals;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.junit.Test;
+
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.serialization.JsonMetadataDeserializer;
+
+public class SerializationTest {
+
+ @Test
+ public void testBasic() throws Exception {
+ String json = "{\"taskId\":49,\"fetchKey\":{\"fetcherName\":\"mock\"," +
+ "\"fetchKey\":\"key-48\"},\"emitKey\":{\"emitterName\":\"mock\"," +
+ "\"emitKey\":\"emit-48\"},\"onParseException\":\"EMIT\",\"metadataList\":" +
+ "[{\"X-TIKA:Parsed-By\":" +
+ "\"org.apache.tika.parser.EmptyParser\",\"X-TIKA:parse_time_millis\":" +
+ "\"0\",\"X-TIKA:embedded_depth\":\"0\"}]}";
+
+ ObjectMapper mapper = new ObjectMapper();
+ SimpleModule module = new SimpleModule();
+ module.addDeserializer(Metadata.class, new JsonMetadataDeserializer());
+ mapper.registerModule(module);
+ AsyncData asyncData = mapper.readValue(json, AsyncData.class);
+ assertEquals(49, asyncData.getTaskId());
+ assertEquals("mock", asyncData.getFetchKey().getFetcherName());
+ assertEquals(1, asyncData.getMetadataList().size());
+ }
+
+}
diff --git a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/TestPipesDriver.java b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/TestPipesDriver.java
deleted file mode 100644
index 394fb16..0000000
--- a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/TestPipesDriver.java
+++ /dev/null
@@ -1,109 +0,0 @@
-package org.apache.tika.pipes.async;
-
-
-import org.apache.commons.io.FileUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class TestPipesDriver {
-
- static Path TMP_DIR;
- static Path DB;
-
- static AtomicInteger PROCESSED = new AtomicInteger(0);
-
- @BeforeClass
- public static void setUp() throws Exception {
- TMP_DIR = Files.createTempDirectory("pipes-driver-");
- DB = Files.createTempFile(TMP_DIR, "", "");
- }
-
- @AfterClass
- public static void tearDown() throws Exception {
- FileUtils.deleteDirectory(TMP_DIR.toFile());
- }
-
-
- @Test
- public void testQueue() throws Exception {
- int numThreads = 20;
- ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10000 + numThreads);
- for (int i = 0; i < 10000; i++) {
- queue.add(1);
- }
- for (int i = 0; i < numThreads; i++) {
- queue.offer(-1);
- }
- ExecutorService service = Executors.newFixedThreadPool(numThreads);
- ExecutorCompletionService<Integer> executorCompletionService = new ExecutorCompletionService<>(service);
-
- long start = System.currentTimeMillis();
- executorCompletionService.submit(new Watcher(queue));
- for (int i = 0; i < numThreads; i++) {
- executorCompletionService.submit(new QueueWorker(queue));
- }
- int finished = 0;
- while (finished++ < numThreads) {
- executorCompletionService.take();
- }
- long elapsed = System.currentTimeMillis() - start;
- System.out.println("elapsed: " + elapsed);
- service.shutdownNow();
- }
-
- private static class Watcher implements Callable<Integer> {
- private final ArrayBlockingQueue<Integer> queue;
-
- Watcher(ArrayBlockingQueue<Integer> queue) {
- this.queue = queue;
- }
-
- @Override
- public Integer call() throws Exception {
- long start = System.currentTimeMillis();
- while (true) {
- long elapsed = System.currentTimeMillis() - start;
- Thread.sleep(1000);
- }
- }
- }
-
- private static class QueueWorker implements Callable<Integer> {
- static AtomicInteger counter = new AtomicInteger(0);
-
-
- private final int id;
- private final ArrayBlockingQueue<Integer> queue;
-
- QueueWorker(ArrayBlockingQueue<Integer> queue) {
- id = counter.incrementAndGet();
- this.queue = queue;
- }
-
- @Override
- public Integer call() throws Exception {
- while (true) {
- Integer val = queue.poll(1, TimeUnit.SECONDS);
- if (val != null) {
- if (val < 0) {
- return 1;
- } else {
- long sleep = id * 100;
- Thread.sleep(sleep);
- }
- }
- }
- }
- }
-}
diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
index 332813f..7fbba83 100644
--- a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
+++ b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
@@ -17,23 +17,6 @@
package org.apache.tika.pipes;
-import com.amazonaws.auth.profile.ProfileCredentialsProvider;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-import com.amazonaws.services.s3.iterable.S3Objects;
-import com.amazonaws.services.s3.model.S3Object;
-import com.amazonaws.services.s3.model.S3ObjectSummary;
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.exception.TikaException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.emitter.Emitter;
-import org.apache.tika.pipes.emitter.s3.S3Emitter;
-import org.apache.tika.pipes.fetcher.Fetcher;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.junit.Ignore;
-import org.junit.Test;
-
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
@@ -49,6 +32,24 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import com.amazonaws.auth.profile.ProfileCredentialsProvider;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.iterable.S3Objects;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.s3.S3Emitter;
+import org.apache.tika.pipes.fetcher.Fetcher;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.FetchIterator;
+
@Ignore("turn these into actual tests")
public class PipeIntegrationTests {
@@ -59,10 +60,8 @@ public class PipeIntegrationTests {
String region = "";
String profile = "";
String bucket = "";
- AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
- .withRegion(region)
- .withCredentials(new ProfileCredentialsProvider(profile))
- .build();
+ AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withRegion(region)
+ .withCredentials(new ProfileCredentialsProvider(profile)).build();
s3Client.listObjects(bucket);
int cnt = 0;
long sz = 0;
@@ -72,17 +71,18 @@ public class PipeIntegrationTests {
if (Files.isRegularFile(targ)) {
continue;
}
- if (! Files.isDirectory(targ.getParent())) {
+ if (!Files.isDirectory(targ.getParent())) {
Files.createDirectories(targ.getParent());
}
- System.out.println("id: " + cnt + " :: " + summary.getKey() + " : " + summary.getSize());
+ System.out
+ .println("id: " + cnt + " :: " + summary.getKey() + " : " + summary.getSize());
S3Object s3Object = s3Client.getObject(bucket, summary.getKey());
Files.copy(s3Object.getObjectContent(), targ);
summary.getSize();
cnt++;
sz += summary.getSize();
}
- System.out.println("iterated: "+cnt + " sz: "+sz);
+ System.out.println("iterated: " + cnt + " sz: " + sz);
}
@Test
@@ -94,8 +94,8 @@ public class PipeIntegrationTests {
ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(es);
ArrayBlockingQueue<FetchEmitTuple> queue = new ArrayBlockingQueue<>(1000);
for (int i = 0; i < numConsumers; i++) {
- completionService.submit(new FSFetcherEmitter(
- queue, tikaConfig.getFetcherManager().getFetcher("s3"), null));
+ completionService.submit(new FSFetcherEmitter(queue,
+ tikaConfig.getFetcherManager().getFetcher("s3"), null));
}
for (FetchEmitTuple t : it) {
queue.offer(t);
@@ -105,7 +105,7 @@ public class PipeIntegrationTests {
}
int finished = 0;
try {
- while (finished++ < numConsumers+1) {
+ while (finished++ < numConsumers + 1) {
Future<Integer> future = completionService.take();
future.get();
}
@@ -122,9 +122,9 @@ public class PipeIntegrationTests {
ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(es);
ArrayBlockingQueue<FetchEmitTuple> queue = new ArrayBlockingQueue<>(1000);
for (int i = 0; i < numConsumers; i++) {
- completionService.submit(new S3FetcherEmitter(
- queue, tikaConfig.getFetcherManager().getFetcher("s3f"),
- (S3Emitter)tikaConfig.getEmitterManager().getEmitter("s3e")));
+ completionService.submit(new S3FetcherEmitter(queue,
+ tikaConfig.getFetcherManager().getFetcher("s3f"),
+ (S3Emitter) tikaConfig.getEmitterManager().getEmitter("s3e")));
}
FetchIterator it = tikaConfig.getFetchIterator();
for (FetchEmitTuple t : it) {
@@ -135,7 +135,7 @@ public class PipeIntegrationTests {
}
int finished = 0;
try {
- while (finished++ < numConsumers+1) {
+ while (finished++ < numConsumers + 1) {
Future<Integer> future = completionService.take();
future.get();
}
@@ -145,8 +145,7 @@ public class PipeIntegrationTests {
}
private TikaConfig getConfig(String fileName) throws Exception {
- try (InputStream is =
- PipeIntegrationTests.class.getResourceAsStream("/"+fileName)) {
+ try (InputStream is = PipeIntegrationTests.class.getResourceAsStream("/" + fileName)) {
return new TikaConfig(is);
}
}
@@ -159,8 +158,8 @@ public class PipeIntegrationTests {
private final Emitter emitter;
private final ArrayBlockingQueue<FetchEmitTuple> queue;
- FSFetcherEmitter(ArrayBlockingQueue<FetchEmitTuple> queue, Fetcher
- fetcher, Emitter emitter) {
+ FSFetcherEmitter(ArrayBlockingQueue<FetchEmitTuple> queue, Fetcher fetcher,
+ Emitter emitter) {
this.queue = queue;
this.fetcher = fetcher;
this.emitter = emitter;
@@ -182,12 +181,12 @@ public class PipeIntegrationTests {
}
private void process(FetchEmitTuple t) throws IOException, TikaException {
- Path targ = OUTDIR.resolve(t.getFetchKey().getKey());
+ Path targ = OUTDIR.resolve(t.getFetchKey().getFetchKey());
if (Files.isRegularFile(targ)) {
return;
}
- try (InputStream is = fetcher.fetch(t.getFetchKey().getKey(), t.getMetadata())) {
- System.out.println(counter.getAndIncrement() + " : "+t );
+ try (InputStream is = fetcher.fetch(t.getFetchKey().getFetchKey(), t.getMetadata())) {
+ System.out.println(counter.getAndIncrement() + " : " + t);
Files.createDirectories(targ.getParent());
Files.copy(is, targ);
}
@@ -201,8 +200,8 @@ public class PipeIntegrationTests {
private final S3Emitter emitter;
private final ArrayBlockingQueue<FetchEmitTuple> queue;
- S3FetcherEmitter(ArrayBlockingQueue<FetchEmitTuple> queue, Fetcher
- fetcher, S3Emitter emitter) {
+ S3FetcherEmitter(ArrayBlockingQueue<FetchEmitTuple> queue, Fetcher fetcher,
+ S3Emitter emitter) {
this.queue = queue;
this.fetcher = fetcher;
this.emitter = emitter;
@@ -227,7 +226,7 @@ public class PipeIntegrationTests {
Metadata userMetadata = new Metadata();
userMetadata.set("project", "my-project");
- try (InputStream is = fetcher.fetch(t.getFetchKey().getKey(), t.getMetadata())) {
+ try (InputStream is = fetcher.fetch(t.getFetchKey().getFetchKey(), t.getMetadata())) {
emitter.emit(t.getEmitKey().getEmitKey(), is, userMetadata);
}
}
diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/resources/log4j.properties b/tika-pipes/tika-pipes-integration-tests/src/test/resources/log4j.properties
index 1bee240..2b2da1a 100644
--- a/tika-pipes/tika-pipes-integration-tests/src/test/resources/log4j.properties
+++ b/tika-pipes/tika-pipes-integration-tests/src/test/resources/log4j.properties
@@ -1,3 +1,4 @@
+#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
@@ -12,13 +13,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
#info,debug, error,fatal ...
log4j.rootLogger=info,stderr
-
#console
log4j.appender.stderr=org.apache.log4j.ConsoleAppender
log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
log4j.appender.stderr.Target=System.err
-
log4j.appender.stderr.layout.ConversionPattern=%-5p [%t]: %m%n
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
index 3024f6a..78ba9d2 100644
--- a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
@@ -133,7 +133,7 @@ public class JsonFetchEmitTuple {
static void writeTuple(FetchEmitTuple t, JsonGenerator jsonGenerator) throws IOException {
jsonGenerator.writeStartObject();
jsonGenerator.writeStringField(FETCHER, t.getFetchKey().getFetcherName());
- jsonGenerator.writeStringField(FETCHKEY, t.getFetchKey().getKey());
+ jsonGenerator.writeStringField(FETCHKEY, t.getFetchKey().getFetchKey());
jsonGenerator.writeStringField(EMITTER, t.getEmitKey().getEmitterName());
if (!StringUtils.isBlank(t.getEmitKey().getEmitKey())) {
jsonGenerator.writeStringField(EMITKEY, t.getEmitKey().getEmitKey());
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadata.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadata.java
index 06612a1..b33c2a9 100644
--- a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadata.java
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadata.java
@@ -26,19 +26,13 @@ import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import org.apache.tika.exception.TikaException;
import org.apache.tika.metadata.Metadata;
-public class JsonMetadata extends StdSerializer<Metadata> {
+public class JsonMetadata {
static volatile boolean PRETTY_PRINT = false;
- public JsonMetadata() {
- super(Metadata.class);
- }
-
/**
* Serializes a Metadata object to Json. This does not flush or close the writer.
*
@@ -146,10 +140,4 @@ public class JsonMetadata extends StdSerializer<Metadata> {
PRETTY_PRINT = prettyPrint;
}
- @Override
- public void serialize(Metadata metadata,
- JsonGenerator jsonGenerator,
- SerializerProvider serializerProvider) throws IOException {
- writeMetadataObject(metadata, jsonGenerator, false);
- }
}
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataDeserializer.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataDeserializer.java
new file mode 100644
index 0000000..5a9b7d0
--- /dev/null
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataDeserializer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.metadata.serialization;
+
+import static org.apache.tika.metadata.serialization.JsonMetadata.readMetadataObject;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import org.apache.tika.metadata.Metadata;
+
+public class JsonMetadataDeserializer extends StdDeserializer<Metadata> {
+
+ public JsonMetadataDeserializer() {
+ super(Metadata.class);
+ }
+
+ @Override
+ public Metadata deserialize(JsonParser jsonParser,
+ DeserializationContext deserializationContext)
+ throws IOException, JsonProcessingException {
+ return readMetadataObject(jsonParser);
+ }
+}
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataSerializer.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataSerializer.java
new file mode 100644
index 0000000..afcf012
--- /dev/null
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataSerializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.metadata.serialization;
+
+import static org.apache.tika.metadata.serialization.JsonMetadata.writeMetadataObject;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import org.apache.tika.metadata.Metadata;
+
+public class JsonMetadataSerializer extends StdSerializer<Metadata> {
+
+ public JsonMetadataSerializer() {
+ super(Metadata.class);
+ }
+
+ @Override
+ public void serialize(Metadata metadata,
+ JsonGenerator jsonGenerator,
+ SerializerProvider serializerProvider) throws IOException {
+ writeMetadataObject(metadata, jsonGenerator, false);
+ }
+}
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
index efd9a5c..d68cc11 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
@@ -64,7 +64,7 @@ public class AsyncParser implements Callable<Integer> {
if (!offered) {
//TODO: deal with this
LOG.warn("Failed to add ({}) " + "to emit queue after 10 minutes.",
- request.getFetchKey().getKey());
+ request.getFetchKey().getFetchKey());
}
}
} else {
@@ -76,14 +76,14 @@ public class AsyncParser implements Callable<Integer> {
private boolean checkForParseException(FetchEmitTuple request, EmitData emitData) {
if (emitData == null || emitData.getMetadataList() == null ||
emitData.getMetadataList().size() == 0) {
- LOG.warn("empty or null emit data ({})", request.getFetchKey().getKey());
+ LOG.warn("empty or null emit data ({})", request.getFetchKey().getFetchKey());
return false;
}
boolean shouldEmit = true;
Metadata container = emitData.getMetadataList().get(0);
String stack = container.get(TikaCoreProperties.CONTAINER_EXCEPTION);
if (stack != null) {
- LOG.warn("fetchKey ({}) container parse exception ({})", request.getFetchKey().getKey(),
+ LOG.warn("fetchKey ({}) container parse exception ({})", request.getFetchKey().getFetchKey(),
stack);
if (request.getOnParseException() == FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP) {
shouldEmit = false;
@@ -95,7 +95,7 @@ public class AsyncParser implements Callable<Integer> {
String embeddedStack = m.get(TikaCoreProperties.EMBEDDED_EXCEPTION);
if (embeddedStack != null) {
LOG.warn("fetchKey ({}) embedded parse exception ({})",
- request.getFetchKey().getKey(), embeddedStack);
+ request.getFetchKey().getFetchKey(), embeddedStack);
}
}
return shouldEmit;
@@ -105,7 +105,7 @@ public class AsyncParser implements Callable<Integer> {
Metadata userMetadata = t.getMetadata();
Metadata metadata = new Metadata();
String fetcherName = t.getFetchKey().getFetcherName();
- String fetchKey = t.getFetchKey().getKey();
+ String fetchKey = t.getFetchKey().getFetchKey();
List<Metadata> metadataList = null;
try (InputStream stream = TikaResource.getConfig().getFetcherManager()
.getFetcher(fetcherName).fetch(fetchKey, metadata)) {
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
index 47dbc50..001926b 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
@@ -138,7 +138,7 @@ public class AsyncResource {
private Map<String, Object> throttle(List<FetchEmitTuple> notAdded, int added) {
List<String> fetchKeys = new ArrayList<>();
for (FetchEmitTuple t : notAdded) {
- fetchKeys.add(t.getFetchKey().getKey());
+ fetchKeys.add(t.getFetchKey().getFetchKey());
}
Map<String, Object> map = new HashMap<>();
map.put("status", "throttled");
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
index 306732a..16aa76e 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
@@ -71,7 +71,7 @@ public class EmitterResource {
//TODO: clean this up?
EmitKey emitKey = t.getEmitKey();
if (StringUtils.isBlank(emitKey.getEmitKey())) {
- emitKey = new EmitKey(emitKey.getEmitterName(), t.getFetchKey().getKey());
+ emitKey = new EmitKey(emitKey.getEmitterName(), t.getFetchKey().getFetchKey());
}
return emitKey;
}
@@ -171,7 +171,7 @@ public class EmitterResource {
List<Metadata> metadataList = null;
try (InputStream stream = TikaResource.getConfig().getFetcherManager()
.getFetcher(t.getFetchKey().getFetcherName())
- .fetch(t.getFetchKey().getKey(), metadata)) {
+ .fetch(t.getFetchKey().getFetchKey(), metadata)) {
metadataList = RecursiveMetadataResource
.parseMetadata(stream, metadata, httpHeaders.getRequestHeaders(), info, "text");
@@ -213,7 +213,7 @@ public class EmitterResource {
shouldEmit = false;
}
LOG.warn("fetchKey ({}) caught container parse exception ({})",
- t.getFetchKey().getKey(), stack);
+ t.getFetchKey().getFetchKey(), stack);
}
for (int i = 1; i < metadataList.size(); i++) {
@@ -221,7 +221,7 @@ public class EmitterResource {
String embeddedStack = m.get(TikaCoreProperties.EMBEDDED_EXCEPTION);
if (embeddedStack != null) {
LOG.warn("fetchKey ({}) caught embedded parse exception ({})",
- t.getFetchKey().getKey(), embeddedStack);
+ t.getFetchKey().getFetchKey(), embeddedStack);
}
}