You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2022/03/24 19:53:31 UTC
[tika] branch main updated: TIKA-3707 -- add fetcher and emitter for Azure blob storage
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new fbeab6c TIKA-3707 -- add fetcher and emitter for Azure blob storage
fbeab6c is described below
commit fbeab6c3fa8008427024934fbfb3470308eff09c
Author: tallison <ta...@apache.org>
AuthorDate: Thu Mar 24 15:53:11 2022 -0400
TIKA-3707 -- add fetcher and emitter for Azure blob storage
---
CHANGES.txt | 2 +
.../org/apache/tika/mime/tika-mimetypes.xml | 16 ++
.../org/apache/tika/parser/http/HttpParser.java | 103 +++++++++
.../services/org.apache.tika.parser.Parser | 1 +
.../apache/tika/parser/http/HttpParserTest.java | 39 ++++
.../test/resources/test-documents/http-response | 18 ++
tika-pipes/tika-emitters/pom.xml | 1 +
.../tika-emitters/tika-emitter-az-blob/pom.xml | 122 ++++++++++
.../tika/pipes/emitter/azblob/AZBlobEmitter.java | 255 +++++++++++++++++++++
.../pipes/emitter/azblob/TestAZBlobEmitter.java | 52 +++++
.../test/resources/config/tika-config-az-blob.xml | 30 +++
tika-pipes/tika-fetchers/pom.xml | 1 +
.../tika-fetchers/tika-fetcher-az-blob/pom.xml | 127 ++++++++++
.../tika/pipes/fetcher/azblob/AZBlobFetcher.java | 151 ++++++++++++
.../pipes/fetcher/azblob/TestAZBlobFetcher.java | 53 +++++
.../src/test/resources/tika-config-az-blob.xml | 30 +++
16 files changed, 1001 insertions(+)
diff --git a/CHANGES.txt b/CHANGES.txt
index e577bc3..7d99421 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -13,6 +13,8 @@ Release 2.4.0 - ???
* Add detection for Frictionless Data packages and WACZ (TIKA-3696).
+ * Add a fetcher and emitter for Azure blob storage (TIKA-3707).
+
* Upgrade deeplearning4j to 1.0.0-M2 (TIKA-3458 and PR#527).
* Various dependency upgrades, including POI, dl4j, gson, jackson,
diff --git a/tika-core/src/main/resources/org/apache/tika/mime/tika-mimetypes.xml b/tika-core/src/main/resources/org/apache/tika/mime/tika-mimetypes.xml
index 9a141b1..64f7f84 100644
--- a/tika-core/src/main/resources/org/apache/tika/mime/tika-mimetypes.xml
+++ b/tika-core/src/main/resources/org/apache/tika/mime/tika-mimetypes.xml
@@ -6465,6 +6465,22 @@
<sub-class-of type="text/x-tika-text-based-message"/>
</mime-type>
+ <mime-type type="application/x-httpresponse">
+ <!-- sometimes PDFs and other files may include the http headers
+ from their retrieval.
+ -->
+ <magic priority="60">
+ <match value="HTTP/" type="string" offset="0">
+ <match value="\nCache-Control:" type="string" offset="0:1000"/>
+ <match value="\nContent-Type:" type="string" offset="0:1000"/>
+ <match value="\nContent-Length:" type="string" offset="0:1000"/>
+ <match value="\nContent-Disposition:" type="string" offset="0:1000"/>
+ <match value="\nDate:" type="string" offset="0:1000"/>
+ <match value="\nServer:" type="string" offset="0:1000"/>
+ </match>
+ <sub-class-of type="text/x-tika-text-based-message"/>
+ </magic>
+ </mime-type>
<!-- TODO See TIKA-2723 for discussions on the mime type hierarchy -->
<!-- and best parser structure for these email-like formats -->
<mime-type type="multipart/related">
diff --git a/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-webarchive-module/src/main/java/org/apache/tika/parser/http/HttpParser.java b/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-webarchive-module/src/main/java/org/apache/tika/parser/http/HttpParser.java
new file mode 100644
index 0000000..35c1ea9
--- /dev/null
+++ b/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-webarchive-module/src/main/java/org/apache/tika/parser/http/HttpParser.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.parser.http;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.commons.io.input.CloseShieldInputStream;
+import org.netpreserve.jwarc.LengthedBody;
+import org.netpreserve.jwarc.MessageBody;
+import org.netpreserve.jwarc.MessageHeaders;
+import org.xml.sax.ContentHandler;
+import org.xml.sax.SAXException;
+
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.extractor.EmbeddedDocumentExtractor;
+import org.apache.tika.extractor.EmbeddedDocumentUtil;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.mime.MediaType;
+import org.apache.tika.parser.AbstractParser;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.sax.XHTMLContentHandler;
+
+public class HttpParser extends AbstractParser {
+
+ private static final MediaType MEDIA_TYPE = MediaType.application("x-httpresponse");
+ private static final Set<MediaType> SUPPORTED_TYPES = Collections.singleton(MEDIA_TYPE);
+ @Override
+ public Set<MediaType> getSupportedTypes(ParseContext context) {
+ return SUPPORTED_TYPES;
+ }
+
+ @Override
+ public void parse(InputStream stream, ContentHandler handler, Metadata metadata,
+ ParseContext context) throws IOException, SAXException, TikaException {
+ org.netpreserve.jwarc.HttpParser parser = new org.netpreserve.jwarc.HttpParser();
+ parser.lenientRequest();
+ parser.lenientResponse();
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ XHTMLContentHandler xhtml = new XHTMLContentHandler(handler, metadata);
+ xhtml.startDocument();
+
+ try (ReadableByteChannel channel =
+ Channels.newChannel(new CloseShieldInputStream(stream))) {
+
+ int len = channel.read(buffer);
+ buffer.flip();
+ if (len < 0) {
+ throw new EOFException();
+ }
+ parser.parse(channel, buffer);
+
+ MessageHeaders messageHeaders = parser.headers();
+ updateMetadata(messageHeaders, metadata);
+ //check for ok status before continuing?
+ long contentLength =
+ messageHeaders.sole("Content-Length").map(Long::parseLong).orElse(0L);
+ //is there a way to handle non-lengthed bodies?
+ if (contentLength > 0) {
+ MessageBody messageBody = LengthedBody.create(channel, buffer, contentLength);
+ Metadata payloadMetadata = new Metadata();
+ try (InputStream messageStream = messageBody.stream()) {
+ parsePayload(messageStream, xhtml, payloadMetadata, context);
+ }
+ }
+ } finally {
+ xhtml.endDocument();
+ }
+ }
+
+ private void parsePayload(InputStream stream, ContentHandler handler, Metadata metadata,
+ ParseContext context) throws IOException, SAXException {
+ EmbeddedDocumentExtractor ex = EmbeddedDocumentUtil.getEmbeddedDocumentExtractor(context);
+ if (ex.shouldParseEmbedded(metadata)) {
+ ex.parseEmbedded(stream, handler, metadata, true);
+ }
+ }
+
+ private void updateMetadata(MessageHeaders messageHeaders, Metadata metadata) {
+ //TODO
+ //metadata.set(HttpHeaders.CONTENT_LENGTH, messageHeaders.)
+ }
+}
diff --git a/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-webarchive-module/src/main/resources/META-INF/services/org.apache.tika.parser.Parser b/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-webarchive-module/src/main/resources/META-INF/services/org.apache.tika.parser.Parser
index 8ce5e1b..0877079 100644
--- a/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-webarchive-module/src/main/resources/META-INF/services/org.apache.tika.parser.Parser
+++ b/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-webarchive-module/src/main/resources/META-INF/services/org.apache.tika.parser.Parser
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+org.apache.tika.parser.http.HttpParser
org.apache.tika.parser.warc.WARCParser
org.apache.tika.parser.wacz.WACZParser
diff --git a/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-webarchive-module/src/test/java/org/apache/tika/parser/http/HttpParserTest.java b/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-webarchive-module/src/test/java/org/apache/tika/parser/http/HttpParserTest.java
new file mode 100644
index 0000000..b5d4c31
--- /dev/null
+++ b/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-webarchive-module/src/test/java/org/apache/tika/parser/http/HttpParserTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.parser.http;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.List;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.tika.TikaTest;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+
+public class HttpParserTest extends TikaTest {
+
+ @Test
+ public void testBasic() throws Exception {
+ List<Metadata> metadataList = getRecursiveMetadata("http-response");
+ assertEquals(2, metadataList.size());
+ assertEquals("application/x-httpresponse", metadataList.get(0).get(Metadata.CONTENT_TYPE));
+ assertContains("this is some content",
+ metadataList.get(1).get(TikaCoreProperties.TIKA_CONTENT));
+ }
+}
diff --git a/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-webarchive-module/src/test/resources/test-documents/http-response b/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-webarchive-module/src/test/resources/test-documents/http-response
new file mode 100644
index 0000000..872f65a
--- /dev/null
+++ b/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-webarchive-module/src/test/resources/test-documents/http-response
@@ -0,0 +1,18 @@
+HTTP/1.1 200 OK
+Cache-Control: private
+Pragma: Public
+Content-Type: text/html; charset=UTF-8
+Server: Microsoft-IIS/7.5
+Content-Disposition: inline; filename=something.html
+X-AspNet-Version: 2.0.50727
+X-Powered-By: ASP.NET
+Date: Fri, 18 Sep 2015 17:30:08 GMT
+Content-Length: 136
+
+
+<html>
+ <head>
+ <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
+ </head>
+ <body>this is some content</body>
+</html>
\ No newline at end of file
diff --git a/tika-pipes/tika-emitters/pom.xml b/tika-pipes/tika-emitters/pom.xml
index c9877fe..1637fa9 100644
--- a/tika-pipes/tika-emitters/pom.xml
+++ b/tika-pipes/tika-emitters/pom.xml
@@ -37,6 +37,7 @@
<module>tika-emitter-solr</module>
<module>tika-emitter-opensearch</module>
<module>tika-emitter-gcs</module>
+ <module>tika-emitter-az-blob</module>
</modules>
<scm>
diff --git a/tika-pipes/tika-emitters/tika-emitter-az-blob/pom.xml b/tika-pipes/tika-emitters/tika-emitter-az-blob/pom.xml
new file mode 100644
index 0000000..57a83f7
--- /dev/null
+++ b/tika-pipes/tika-emitters/tika-emitter-az-blob/pom.xml
@@ -0,0 +1,122 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>tika-emitters</artifactId>
+ <groupId>org.apache.tika</groupId>
+ <version>2.4.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>tika-emitter-az-blob</artifactId>
+ <name>Apache Tika Azure blob</name>
+
+
+ <dependencies>
+ <!-- should serialization be provided or bundled? -->
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-serialization</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-storage-blob</artifactId>
+ <version>12.14.4</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifestEntries>
+ <Automatic-Module-Name>org.apache.tika.pipes.emitter.gcs</Automatic-Module-Name>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>${maven.shade.version}</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <createDependencyReducedPom>
+ false
+ </createDependencyReducedPom>
+ <!-- <filters> -->
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*</exclude>
+ <exclude>LICENSE.txt</exclude>
+ <exclude>NOTICE.txt</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+ <resource>META-INF/LICENSE</resource>
+ <file>target/classes/META-INF/LICENSE</file>
+ </transformer>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+ <resource>META-INF/NOTICE</resource>
+ <file>target/classes/META-INF/NOTICE</file>
+ </transformer>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+ <resource>META-INF/DEPENDENCIES</resource>
+ <file>target/classes/META-INF/DEPENDENCIES</file>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+
+ <scm>
+ <tag>2.2.1-rc2</tag>
+ </scm>
+</project>
\ No newline at end of file
diff --git a/tika-pipes/tika-emitters/tika-emitter-az-blob/src/main/java/org/apache/tika/pipes/emitter/azblob/AZBlobEmitter.java b/tika-pipes/tika-emitters/tika-emitter-az-blob/src/main/java/org/apache/tika/pipes/emitter/azblob/AZBlobEmitter.java
new file mode 100644
index 0000000..0f8fbd1
--- /dev/null
+++ b/tika-pipes/tika-emitters/tika-emitter-az-blob/src/main/java/org/apache/tika/pipes/emitter/azblob/AZBlobEmitter.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.emitter.azblob;
+
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+
+import com.azure.core.credential.AzureSasCredential;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.config.Field;
+import org.apache.tika.config.Initializable;
+import org.apache.tika.config.InitializableProblemHandler;
+import org.apache.tika.config.Param;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.io.TikaInputStream;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.metadata.serialization.JsonMetadataList;
+import org.apache.tika.pipes.emitter.AbstractEmitter;
+import org.apache.tika.pipes.emitter.StreamEmitter;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
+import org.apache.tika.utils.StringUtils;
+
+
+/**
+ * Emit files to Azure blob storage. Must set endpoint, sasToken and container via config.
+ *
+ */
+
+public class AZBlobEmitter extends AbstractEmitter implements Initializable, StreamEmitter {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AZBlobEmitter.class);
+ private String fileExtension = "json";
+
+ private String prefix = "";
+
+ private String sasToken;
+ private String container;
+ private String endpoint;
+ private BlobServiceClient blobServiceClient;
+ private BlobContainerClient blobContainerClient;
+ private boolean overwriteExisting = false;
+ /**
+ * Requires the src-bucket/path/to/my/file.txt in the {@link TikaCoreProperties#SOURCE_PATH}.
+ *
+ * @param metadataList
+ * @throws IOException
+ * @throws TikaException
+ */
+ @Override
+ public void emit(String emitKey, List<Metadata> metadataList)
+ throws IOException, TikaEmitterException {
+ if (metadataList == null || metadataList.size() == 0) {
+ throw new TikaEmitterException("metadata list must not be null or of size 0");
+ }
+ //TODO: estimate size of metadata list. Above a certain size,
+ //create a temp file?
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try (Writer writer = new OutputStreamWriter(bos, StandardCharsets.UTF_8)) {
+ JsonMetadataList.toJson(metadataList, writer);
+ } catch (IOException e) {
+ throw new TikaEmitterException("can't jsonify", e);
+ }
+ Metadata metadata = new Metadata();
+ emit(emitKey, TikaInputStream.get(bos.toByteArray(), metadata), metadata);
+
+ }
+
+ /**
+ * @param path object path; prefix will be prepended
+ * @param is inputStream to copy, if a TikaInputStream contains an underlying file,
+ * the client will upload the file; if a content-length is included in the
+ * metadata, the client will upload the stream with the content-length;
+ * otherwise, the client will copy the stream to a byte array and then
+ * upload.
+ * @param userMetadata this will be written to the az blob's properties.metadata
+ * @throws TikaEmitterException or IOexception if there is a Runtime client exception
+ */
+ @Override
+ public void emit(String path, InputStream is, Metadata userMetadata)
+ throws IOException, TikaEmitterException {
+ String lengthString = userMetadata.get(Metadata.CONTENT_LENGTH);
+ long length = -1;
+ if (lengthString != null) {
+ try {
+ length = Long.parseLong(lengthString);
+ } catch (NumberFormatException e) {
+ LOGGER.warn("Bad content-length: " + lengthString);
+ }
+ }
+ if (is instanceof TikaInputStream && ((TikaInputStream) is).hasFile()) {
+ write(path, userMetadata, ((TikaInputStream) is).getPath());
+ } else if (length > -1) {
+ LOGGER.debug("relying on the content-length set in the metadata object: {}", length);
+ write(path, userMetadata, is, length);
+ } else {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ IOUtils.copy(is, bos);
+ write(path, userMetadata, bos.toByteArray());
+ }
+ }
+
+ private void write(String path, Metadata userMetadata, InputStream is, long length) {
+ String actualPath = getActualPath(path);
+ LOGGER.debug("about to emit to target container: ({}) path:({})", container, actualPath);
+ BlobClient blobClient = blobContainerClient.getBlobClient(actualPath);
+ updateMetadata(blobClient, userMetadata);
+ blobClient.upload(is, length, overwriteExisting);
+ }
+
+ private void write(String path, Metadata userMetadata, Path file) {
+ String actualPath = getActualPath(path);
+ LOGGER.debug("about to emit to target container: ({}) path:({})", container, actualPath);
+ BlobClient blobClient = blobContainerClient.getBlobClient(actualPath);
+ updateMetadata(blobClient, userMetadata);
+
+ blobClient.uploadFromFile(file.toAbsolutePath().toString(), overwriteExisting);
+ }
+
+ private void write(String path, Metadata userMetadata, byte[] bytes) {
+ String actualPath = getActualPath(path);
+ LOGGER.debug("about to emit to target container: ({}) path:({})", container, actualPath);
+ BlobClient blobClient = blobContainerClient.getBlobClient(actualPath);
+ updateMetadata(blobClient, userMetadata);
+ blobClient.upload(new ByteArrayInputStream(bytes), bytes.length, overwriteExisting);
+ }
+
+ private void updateMetadata(BlobClient blobClient, Metadata userMetadata) {
+ for (String n : userMetadata.names()) {
+ if (n.equals(Metadata.CONTENT_LENGTH)) {
+ continue;
+ }
+ String[] vals = userMetadata.getValues(n);
+ if (vals.length > 1) {
+ LOGGER.warn("Can only write the first value for key {}. I see {} values.", n,
+ vals.length);
+ }
+ blobClient.getProperties().getMetadata().put(n, vals[0]);
+ }
+
+ }
+
+ private String getActualPath(final String path) {
+ String ret = null;
+ if (!StringUtils.isBlank(prefix)) {
+ ret = prefix + "/" + path;
+ } else {
+ ret = path;
+ }
+
+ if (!StringUtils.isBlank(fileExtension)) {
+ ret += "." + fileExtension;
+ }
+ return ret;
+ }
+
+ @Field
+ public void setSasToken(String sasToken) {
+ this.sasToken = sasToken;
+ }
+
+ @Field
+ public void setEndpoint(String endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ @Field
+ public void setContainer(String container) {
+ this.container = container;
+ }
+
+ @Field
+ public void setOverwriteExisting(boolean overwriteExisting) {
+ this.overwriteExisting = overwriteExisting;
+ }
+
+ @Field
+ public void setPrefix(String prefix) {
+ //strip final "/" if it exists
+ if (prefix.endsWith("/")) {
+ this.prefix = prefix.substring(0, prefix.length() - 1);
+ } else {
+ this.prefix = prefix;
+ }
+ }
+
+ /**
+ * If you want to customize the output file's file extension.
+ * Do not include the "."
+ *
+ * @param fileExtension
+ */
+ @Field
+ public void setFileExtension(String fileExtension) {
+ this.fileExtension = fileExtension;
+ }
+
+
+ /**
+ * This initializes the az blob container client
+ *
+ * @param params params to use for initialization
+ * @throws TikaConfigException
+ */
+ @Override
+ public void initialize(Map<String, Param> params) throws TikaConfigException {
+ //TODO -- allow authentication via other methods
+ blobServiceClient = new BlobServiceClientBuilder()
+ .endpoint(endpoint)
+ .credential(new AzureSasCredential(sasToken))
+ .buildClient();
+ blobContainerClient = blobServiceClient.getBlobContainerClient(container);
+ }
+
+ @Override
+ public void checkInitialization(InitializableProblemHandler problemHandler)
+ throws TikaConfigException {
+ mustNotBeEmpty("sasToken", this.sasToken);
+ mustNotBeEmpty("endpoint", this.endpoint);
+ mustNotBeEmpty("container", this.container);
+ }
+
+}
diff --git a/tika-pipes/tika-emitters/tika-emitter-az-blob/src/test/java/org/apache/tika/pipes/emitter/azblob/TestAZBlobEmitter.java b/tika-pipes/tika-emitters/tika-emitter-az-blob/src/test/java/org/apache/tika/pipes/emitter/azblob/TestAZBlobEmitter.java
new file mode 100644
index 0000000..4b93978
--- /dev/null
+++ b/tika-pipes/tika-emitters/tika-emitter-az-blob/src/test/java/org/apache/tika/pipes/emitter/azblob/TestAZBlobEmitter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.emitter.azblob;
+
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.EmitterManager;
+
+@Disabled("turn into an actual test")
+public class TestAZBlobEmitter {
+
+ @Test
+ public void testBasic() throws Exception {
+ EmitterManager emitterManager = EmitterManager.load(getConfig("tika-config-az-blob.xml"));
+ Emitter emitter = emitterManager.getEmitter("az-blob");
+ List<Metadata> metadataList = new ArrayList<>();
+ Metadata m = new Metadata();
+ m.set("k1", "v1");
+ m.add("k1", "v2");
+ m.set("k2", "v3");
+ m.add("k2", "v4");
+ metadataList.add(m);
+ emitter.emit("something-or-other/test-out", metadataList);
+ }
+
+ private Path getConfig(String configFile) throws URISyntaxException {
+ return Paths.get(this.getClass().getResource("/config/" + configFile).toURI());
+ }
+}
diff --git a/tika-pipes/tika-emitters/tika-emitter-az-blob/src/test/resources/config/tika-config-az-blob.xml b/tika-pipes/tika-emitters/tika-emitter-az-blob/src/test/resources/config/tika-config-az-blob.xml
new file mode 100644
index 0000000..99a2e2b
--- /dev/null
+++ b/tika-pipes/tika-emitters/tika-emitter-az-blob/src/test/resources/config/tika-config-az-blob.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<properties>
+ <emitters>
+ <emitter class="org.apache.tika.pipes.emitter.azblob.AZBlobEmitter">
+ <params>
+ <name>az-blob</name>
+ <!-- these have to be non-null -->
+ <endpoint></endpoint>
+ <container></container>
+ <sasToken></sasToken>
+ </params>
+ </emitter>
+ </emitters>
+</properties>
\ No newline at end of file
diff --git a/tika-pipes/tika-fetchers/pom.xml b/tika-pipes/tika-fetchers/pom.xml
index 2fe9699..b694a2a 100644
--- a/tika-pipes/tika-fetchers/pom.xml
+++ b/tika-pipes/tika-fetchers/pom.xml
@@ -35,6 +35,7 @@
<module>tika-fetcher-http</module>
<module>tika-fetcher-s3</module>
<module>tika-fetcher-gcs</module>
+ <module>tika-fetcher-az-blob</module>
</modules>
<scm>
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-az-blob/pom.xml b/tika-pipes/tika-fetchers/tika-fetcher-az-blob/pom.xml
new file mode 100644
index 0000000..44eddcf
--- /dev/null
+++ b/tika-pipes/tika-fetchers/tika-fetcher-az-blob/pom.xml
@@ -0,0 +1,127 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>tika-fetchers</artifactId>
+ <groupId>org.apache.tika</groupId>
+ <version>2.4.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>tika-fetcher-az-blob</artifactId>
+ <name>Apache Tika Azure Blob fetcher</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-storage-blob</artifactId>
+ <version>12.14.4</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-serialization</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifestEntries>
+ <Automatic-Module-Name>org.apache.tika.pipes.fetcher.gcs</Automatic-Module-Name>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>${maven.shade.version}</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <createDependencyReducedPom>
+ false
+ </createDependencyReducedPom>
+ <!-- <filters> -->
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*</exclude>
+ <exclude>LICENSE.txt</exclude>
+ <exclude>NOTICE.txt</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+ <resource>META-INF/LICENSE</resource>
+ <file>target/classes/META-INF/LICENSE</file>
+ </transformer>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+ <resource>META-INF/NOTICE</resource>
+ <file>target/classes/META-INF/NOTICE</file>
+ </transformer>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+ <resource>META-INF/DEPENDENCIES</resource>
+ <file>target/classes/META-INF/DEPENDENCIES</file>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+
+ <scm>
+ <tag>2.2.1-rc2</tag>
+ </scm>
+</project>
\ No newline at end of file
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/main/java/org/apache/tika/pipes/fetcher/azblob/AZBlobFetcher.java b/tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/main/java/org/apache/tika/pipes/fetcher/azblob/AZBlobFetcher.java
new file mode 100644
index 0000000..5ecb6d9
--- /dev/null
+++ b/tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/main/java/org/apache/tika/pipes/fetcher/azblob/AZBlobFetcher.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.fetcher.azblob;
+
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.config.Field;
+import org.apache.tika.config.Initializable;
+import org.apache.tika.config.InitializableProblemHandler;
+import org.apache.tika.config.Param;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.io.TemporaryResources;
+import org.apache.tika.io.TikaInputStream;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.fetcher.AbstractFetcher;
+
+/**
+ * Fetches files from Azure blob storage. Must set endpoint, sasToken and container via config.
+ */
+public class AZBlobFetcher extends AbstractFetcher implements Initializable {
+
+ private static String PREFIX = "az-blob";
+ private static final Logger LOGGER = LoggerFactory.getLogger(AZBlobFetcher.class);
+ private String sasToken;
+ private String container;
+ private String endpoint;
+ private boolean extractUserMetadata = true;
+ private BlobServiceClient blobServiceClient;
+ private BlobContainerClient blobContainerClient;
+ private boolean spoolToTemp = true;
+
+ @Override
+ public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {
+
+ LOGGER.debug("about to fetch fetchkey={} from endpoint ({})", fetchKey, endpoint);
+
+ try {
+ BlobClient blobClient = blobContainerClient.getBlobClient(fetchKey);
+ //TODO: extract other metadata, eg. md5, crc, etc.
+
+ if (extractUserMetadata) {
+ BlobProperties properties = blobClient.getProperties();
+ if (properties.getMetadata() != null) {
+ for (Map.Entry<String, String> e : properties.getMetadata().entrySet()) {
+ metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
+ }
+ }
+ }
+ if (!spoolToTemp) {
+ return TikaInputStream.get(blobClient.openInputStream());
+ } else {
+ long start = System.currentTimeMillis();
+ TemporaryResources tmpResources = new TemporaryResources();
+ Path tmp = tmpResources.createTempFile();
+ try (OutputStream os = Files.newOutputStream(tmp)) {
+ blobClient.download(os);
+ }
+ TikaInputStream tis = TikaInputStream.get(tmp, metadata, tmpResources);
+ long elapsed = System.currentTimeMillis() - start;
+ LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
+ return tis;
+ }
+ } catch (Exception e) {
+ throw new IOException("az-blob storage exception", e);
+ }
+ }
+
+ @Field
+ public void setSpoolToTemp(boolean spoolToTemp) {
+ this.spoolToTemp = spoolToTemp;
+ }
+
+ @Field
+ public void setSasToken(String sasToken) {
+ this.sasToken = sasToken;
+ }
+
+ @Field
+ public void setEndpoint(String endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ @Field
+ public void setContainer(String container) {
+ this.container = container;
+ }
+ /**
+ * Whether or not to extract user metadata from the blob object
+ *
+ * @param extractUserMetadata
+ */
+ @Field
+ public void setExtractUserMetadata(boolean extractUserMetadata) {
+ this.extractUserMetadata = extractUserMetadata;
+ }
+
+
+ /**
+ * This initializes the az blob container client
+ *
+ * @param params params to use for initialization
+ * @throws TikaConfigException
+ */
+ @Override
+ public void initialize(Map<String, Param> params) throws TikaConfigException {
+ //TODO -- allow authentication via other methods
+ blobServiceClient = new BlobServiceClientBuilder()
+ .endpoint(endpoint)
+ .sasToken(sasToken)
+ .buildClient();
+ blobContainerClient = blobServiceClient.getBlobContainerClient(container);
+ }
+
+ @Override
+ public void checkInitialization(InitializableProblemHandler problemHandler)
+ throws TikaConfigException {
+ mustNotBeEmpty("sasToken", this.sasToken);
+ mustNotBeEmpty("endpoint", this.endpoint);
+ mustNotBeEmpty("container", this.container);
+ }
+}
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/test/java/org/apache/tika/pipes/fetcher/azblob/TestAZBlobFetcher.java b/tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/test/java/org/apache/tika/pipes/fetcher/azblob/TestAZBlobFetcher.java
new file mode 100644
index 0000000..b499ac9
--- /dev/null
+++ b/tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/test/java/org/apache/tika/pipes/fetcher/azblob/TestAZBlobFetcher.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.fetcher.azblob;
+
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
+import java.util.List;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import org.apache.tika.TikaTest;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.serialization.JsonMetadataList;
+import org.apache.tika.pipes.fetcher.Fetcher;
+import org.apache.tika.pipes.fetcher.FetcherManager;
+
+@Disabled("write actual unit tests")
+public class TestAZBlobFetcher extends TikaTest {
+
+ private static final String FETCH_STRING = "something-or-other/test-out.json";
+
+ @Test
+ public void testConfig() throws Exception {
+ FetcherManager fetcherManager = FetcherManager.load(
+ Paths.get(this.getClass().getResource("/tika-config-az-blob.xml").toURI()));
+ Fetcher fetcher = fetcherManager.getFetcher("az-blob");
+ List<Metadata> metadataList = null;
+ try (Reader reader = new BufferedReader(new InputStreamReader(
+ fetcher.fetch(FETCH_STRING, new Metadata()), StandardCharsets.UTF_8))) {
+ metadataList = JsonMetadataList.fromJson(reader);
+ }
+ debug(metadataList);
+ }
+}
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/test/resources/tika-config-az-blob.xml b/tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/test/resources/tika-config-az-blob.xml
new file mode 100644
index 0000000..94dd697
--- /dev/null
+++ b/tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/test/resources/tika-config-az-blob.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<properties>
+ <fetchers>
+ <fetcher class="org.apache.tika.pipes.fetcher.azblob.AZBlobFetcher">
+ <params>
+ <name>az-blob</name>
+ <!-- these have to be non-null -->
+ <endpoint></endpoint>
+ <container></container>
+ <sasToken></sasToken>
+ </params>
+ </fetcher>
+ </fetchers>
+</properties>
\ No newline at end of file